171 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			171 lines
		
	
	
		
			4.3 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
/**
 | 
						|
 * Copyright (c) 2021 OceanBase
 | 
						|
 * OceanBase CE is licensed under Mulan PubL v2.
 | 
						|
 * You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
 * You may obtain a copy of Mulan PubL v2 at:
 | 
						|
 *          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
 * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
 * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
 * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
 * See the Mulan PubL v2 for more details.
 | 
						|
 */
 | 
						|
 | 
						|
#ifndef RPC_TESTING_H
 | 
						|
#define RPC_TESTING_H
 | 
						|
 | 
						|
#include <memory>
 | 
						|
#include "rpc/frame/ob_net_easy.h"
 | 
						|
#include "rpc/obrpc/ob_rpc_handler.h"
 | 
						|
#include "rpc/obrpc/ob_rpc_packet.h"
 | 
						|
#include "rpc/frame/ob_req_deliver.h"
 | 
						|
#include "rpc/ob_request.h"
 | 
						|
// for convenience of outer use
 | 
						|
#include "rpc/obrpc/ob_rpc_proxy.h"
 | 
						|
#include "rpc/obrpc/ob_rpc_processor.h"
 | 
						|
 | 
						|
namespace rpctesting {
 | 
						|
using namespace oceanbase;
 | 
						|
using namespace oceanbase::common;
 | 
						|
using namespace oceanbase::rpc;
 | 
						|
using namespace oceanbase::rpc::frame;
 | 
						|
using namespace oceanbase::obrpc;
 | 
						|
 | 
						|
class Service
 | 
						|
{
 | 
						|
  // Deliver is the class that transfers the result of unpacking RpcHandler to the corresponding queue. Here will be countered
 | 
						|
  // The deliver method of Service facilitates the integration of logic into the Service class.
 | 
						|
  class Deliver
 | 
						|
      : public ObReqQDeliver {
 | 
						|
  public:
 | 
						|
    Deliver(ObiReqQHandler &qhandler, Service &service)
 | 
						|
        : ObReqQDeliver(qhandler),
 | 
						|
          service_(service)
 | 
						|
    {}
 | 
						|
    int init() override { return OB_SUCCESS; }
 | 
						|
    void stop() override {}
 | 
						|
    int deliver(rpc::ObRequest &req) override;
 | 
						|
  private:
 | 
						|
    Service &service_;
 | 
						|
  };
 | 
						|
  // Translator is responsible for translating an ObRequest request into the corresponding Processor. Here will
 | 
						|
  // Reverse the translate method of Service, so that the logic is concentrated in the Service.
 | 
						|
  class Translator
 | 
						|
      : public ObReqTranslator {
 | 
						|
  public:
 | 
						|
    Translator(Service &service)
 | 
						|
        : service_(service)
 | 
						|
    {}
 | 
						|
  protected:
 | 
						|
    ObReqProcessor *get_processor(ObRequest &req) override;
 | 
						|
  private:
 | 
						|
    Service &service_;
 | 
						|
  };
 | 
						|
 | 
						|
public:
 | 
						|
  Service(int listen_port=33244)
 | 
						|
      : easy_(),
 | 
						|
        translator_(*this),
 | 
						|
        qhandler_(translator_),
 | 
						|
        deliver_(qhandler_, *this),
 | 
						|
        handler_(deliver_),
 | 
						|
        transport_(nullptr),
 | 
						|
        listen_port_(listen_port),
 | 
						|
        try_listen_cnt_(0),
 | 
						|
        queue_(),
 | 
						|
        proc_map_()
 | 
						|
  {}
 | 
						|
  virtual ~Service() {}
 | 
						|
 | 
						|
  int init();
 | 
						|
  int get_listen_port() const { return listen_port_; }
 | 
						|
  int get_proxy(ObRpcProxy &proxy) { return proxy.init(transport_); }
 | 
						|
  const common::ObAddr get_dst() const
 | 
						|
  { return common::ObAddr(common::ObAddr::IPV4, "127.0.0.1", get_listen_port()); }
 | 
						|
 | 
						|
  template <class Proc>
 | 
						|
  int reg_processor(Proc *p);
 | 
						|
 | 
						|
 | 
						|
protected:
 | 
						|
  ObReqProcessor *translate(ObRequest &req);
 | 
						|
  int deliver(ObRequest &req);
 | 
						|
 | 
						|
private:
 | 
						|
  ObNetEasy easy_;
 | 
						|
  Translator translator_;
 | 
						|
  ObReqQHandler qhandler_;
 | 
						|
  Deliver deliver_;
 | 
						|
  ObRpcHandler handler_;
 | 
						|
  ObReqTransport *transport_;
 | 
						|
 | 
						|
  int listen_port_;
 | 
						|
  int try_listen_cnt_;
 | 
						|
 | 
						|
  ObReqQueueThread queue_;
 | 
						|
  ObReqProcessor* proc_map_[65536];
 | 
						|
};
 | 
						|
 | 
						|
int Service::Deliver::deliver(ObRequest &req)
 | 
						|
{
 | 
						|
  return service_.deliver(req);
 | 
						|
}
 | 
						|
 | 
						|
ObReqProcessor *Service::Translator::get_processor(ObRequest &req)
 | 
						|
{
 | 
						|
  return service_.translate(req);
 | 
						|
}
 | 
						|
 | 
						|
int Service::init()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  queue_.set_qhandler(&qhandler_);
 | 
						|
  if (OB_FAIL(queue_.get_thread().start())) {
 | 
						|
  }
 | 
						|
  if (OB_SUCC(ret)) {
 | 
						|
    ObNetOptions opts;
 | 
						|
    opts.rpc_io_cnt_ = 1;
 | 
						|
    opts.mysql_io_cnt_ = 1;
 | 
						|
    if (OB_SUCC(easy_.init(opts))) {
 | 
						|
      ret = easy_.start();
 | 
						|
    }
 | 
						|
  }
 | 
						|
  if (OB_SUCC(ret)) {
 | 
						|
    ret = easy_.add_rpc_listen(listen_port_, handler_, transport_);
 | 
						|
    while (try_listen_cnt_++ < 1000 && OB_FAIL(ret)) {
 | 
						|
      ret = easy_.add_rpc_listen(++listen_port_, handler_, transport_);
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
ObReqProcessor *Service::translate(ObRequest &req)
 | 
						|
{
 | 
						|
  const ObRpcPacket &pkt
 | 
						|
      = reinterpret_cast<const ObRpcPacket&>(req.get_packet());
 | 
						|
  return proc_map_[pkt.get_pcode()];
 | 
						|
}
 | 
						|
 | 
						|
int Service::deliver(ObRequest &req)
 | 
						|
{
 | 
						|
  static constexpr int64_t MAX_QUEUE_LEN = 65536;
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  if (!queue_.push(&req, MAX_QUEUE_LEN)) {
 | 
						|
    ret = OB_QUEUE_OVERFLOW;
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
template <class Proc>
 | 
						|
int Service::reg_processor(Proc *p)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  proc_map_[Proc::PCODE] = p;
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
 | 
						|
 | 
						|
}  // rpctesting
 | 
						|
 | 
						|
#endif /* RPC_TESTING_H */
 |