/** * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. * You may obtain a copy of Mulan PubL v2 at: * http://license.coscl.org.cn/MulanPubL-2.0 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. * See the Mulan PubL v2 for more details. */ #ifndef OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX #define OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX #include #include #include "lib/ob_errno.h" #include "lib/utility/ob_macro_utils.h" #include "lib/utility/ob_print_utils.h" #include "storage/tx/ob_committer_define.h" namespace oceanbase { namespace transaction { template class ObMailBoxMgr; template class ObMail { public: int init(int64_t from, int64_t to, uint64_t size, const MailType &mail) { if (NULL != mail_) { std::free(mail_); } from_ = from; to_ = to; size_ = size; mail_ = (MailType*)std::malloc(size); std::memcpy((void*)mail_, (void*)(&mail), size); return OB_SUCCESS; } ObMail() { mail_ = NULL; } ObMail(const ObMail &other) { from_ = other.from_; to_ = other.to_; size_ = other.size_; mail_ = (MailType*)std::malloc(size_); std::memcpy((void*)mail_, (void*)(other.mail_), size_); } ~ObMail() { if (NULL != mail_) { std::free(mail_); } } ObMail& operator=(const ObMail& other) { if (NULL != mail_) { std::free(mail_); } from_ = other.from_; to_ = other.to_; size_ = other.size_; mail_ = (MailType*)std::malloc(size_); std::memcpy((void*)mail_, (void*)(other.mail_), size_); return *this; } /* ObMail& operator=(const ObMail &other) */ /* { */ /* from_ = other.from_; */ /* to_ = other.to_; */ /* size_ = other.size_; */ /* mail_ = (MailType*)std::malloc(size_); */ /* std::memcpy((void*)mail_, (void*)(other.mail_), size_); */ /* } */ int64_t from_; int64_t to_; uint64_t size_; MailType* mail_; TO_STRING_KV(K_(from), K_(to), K_(*mail)); }; template class ObMailHandler { public: virtual int handle(const ObMail& mail) = 0; int64_t sign_ = 39; virtual TO_STRING_KV(K_(sign)); }; template class ObMailBox { public: int64_t addr_; std::deque> mailbox_; ObMailHandler *ctx_; ObMailBoxMgr *mailbox_mgr_; ~ObMailBox() { mailbox_.clear(); } int init(int64_t addr, ObMailBoxMgr *mailbox_mgr, ObMailHandler *ctx); int handle(const bool must_have = true); int handle_all(); int send(const ObMail& mail, const int64_t receiver); int send_to_head(const ObMail& mail, const int64_t receiver); int fetch_mail(ObMail& mail); int64_t to_string(char *buffer, const int64_t size) const; }; template class ObMailBoxMgr { public: int64_t counter_ = 0; std::map*> mgr_; int register_mailbox(int64_t &addr, ObMailBox &mailbox, ObMailHandler *ctx); int send(const ObMail& mail, const int64_t receive); int send_to_head(const ObMail& mail, const int64_t receive); void reset(); }; template int ObMailBox::init(int64_t addr, ObMailBoxMgr *mailbox_mgr, ObMailHandler *ctx) { int ret = OB_SUCCESS; mailbox_.clear(); addr_ = addr; ctx_ = ctx; mailbox_mgr_ = mailbox_mgr; return ret; } template int ObMailBox::fetch_mail(ObMail &mail) { int ret = OB_SUCCESS; if (mailbox_.empty()) { TRANS_LOG(ERROR, "mailbox is empty, but must handle", K(*this)); ob_abort(); } else { mail = mailbox_.front(); mailbox_.pop_front(); } return ret; } template int ObMailBox::handle(const bool must_have) { int ret = OB_SUCCESS; if (must_have && mailbox_.empty()) { TRANS_LOG(ERROR, "mailbox is empty, but must handle", K(*this)); ob_abort(); } else if (mailbox_.empty()) { ret = OB_SUCCESS; } else { ObMail mail = mailbox_.front(); mailbox_.pop_front(); ret = ctx_->handle(mail); } return ret; } template int ObMailBox::handle_all() { int ret = OB_SUCCESS; while (OB_SUCC(ret) && !mailbox_.empty()) { ObMail mail = mailbox_.front(); mailbox_.pop_front(); ret = ctx_->handle(mail); } return ret; } template int ObMailBox::send(const ObMail& mail, const int64_t receiver) { int ret = OB_SUCCESS; ret = mailbox_mgr_->send(mail, receiver); return ret; } template int ObMailBox::send_to_head(const ObMail& mail, const int64_t receiver) { int ret = OB_SUCCESS; ret = mailbox_mgr_->send_to_head(mail, receiver); return ret; } template int64_t ObMailBox::to_string(char *buffer, const int64_t size) const { int64_t pos = 0; if (nullptr != buffer && size > 0) { databuff_printf(buffer, size, pos, "{addr: %ld, DEQUE: [", addr_); for (auto it = mailbox_.begin(); it != mailbox_.end(); ++it) { databuff_printf(buffer, size, pos, "(%s), ", to_cstring(*it)); } databuff_printf(buffer, size, pos, "]}"); } return pos; } template int ObMailBoxMgr::register_mailbox(int64_t &addr, ObMailBox &mailbox, ObMailHandler *ctx) { int ret = OB_SUCCESS; addr = ++counter_; ret = mailbox.init(addr, this, ctx); mgr_[addr] = &mailbox; TRANS_LOG(INFO, "register mailbox", K(mailbox), KP(ctx)); return ret; } template void ObMailBoxMgr::reset() { counter_ = 0; mgr_.clear(); TRANS_LOG(INFO, "reset mailbox",K(this)); } template int ObMailBoxMgr::send(const ObMail& mail, const int64_t receiver) { int ret = OB_SUCCESS; if (mgr_.count(mail.to_) != 0) { mgr_[receiver]->mailbox_.push_back(mail); TRANS_LOG(INFO, "send mailbox success", K(ret), K(mail), K(*mgr_[receiver])); } return ret; } template int ObMailBoxMgr::send_to_head(const ObMail& mail, const int64_t receiver) { int ret = OB_SUCCESS; if (mgr_.count(mail.to_) != 0) { mgr_[receiver]->mailbox_.push_front(mail); TRANS_LOG(INFO, "send to mailbox front success", K(ret), K(mail), K(*mgr_[receiver])); } return ret; } } // namespace transaction } // namespace oceanbase #endif // OCEANBASE_UNITTEST_STORAGE_TX_OB_MAILBOX