237 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
			
		
		
	
	
			237 lines
		
	
	
		
			7.2 KiB
		
	
	
	
		
			C++
		
	
	
	
	
	
// Copyright (c) 2021 OceanBase
 | 
						|
// OceanBase is licensed under Mulan PubL v2.
 | 
						|
// You can use this software according to the terms and conditions of the Mulan PubL v2.
 | 
						|
// You may obtain a copy of Mulan PubL v2 at:
 | 
						|
//          http://license.coscl.org.cn/MulanPubL-2.0
 | 
						|
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
 | 
						|
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
 | 
						|
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
 | 
						|
// See the Mulan PubL v2 for more details.
 | 
						|
 | 
						|
#define USING_LOG_PREFIX CLOG
 | 
						|
#include "ob_log_client.h"
 | 
						|
#include "logservice/palf/palf_env.h"
 | 
						|
#include "logservice/palf/palf_handle.h"
 | 
						|
#include "lib/utility/ob_macro_utils.h"
 | 
						|
#include "lib/thread/ob_thread_name.h"        // set_thread_name
 | 
						|
#include "lib/function/ob_function.h"         // ObFunction
 | 
						|
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_proxy.h"            // RpcProxy
 | 
						|
#include "mittest/palf_cluster/logservice/log_service.h"            // LogService
 | 
						|
 | 
						|
namespace oceanbase
 | 
						|
{
 | 
						|
using namespace common;
 | 
						|
using namespace palf;
 | 
						|
namespace palfcluster
 | 
						|
{
 | 
						|
int64_t c_append_cnt = 0;
 | 
						|
int64_t c_rt = 0;
 | 
						|
 | 
						|
ObLogClient::ObLogClient()
 | 
						|
    : self_(),
 | 
						|
      palf_id_(-1),
 | 
						|
      rpc_proxy_(NULL),
 | 
						|
      log_handler_(),
 | 
						|
      lock_(),
 | 
						|
      log_size_(-1),
 | 
						|
      election_priority_(),
 | 
						|
      total_num_(0),
 | 
						|
      is_inited_(false)
 | 
						|
  {}
 | 
						|
 | 
						|
ObLogClient::~ObLogClient()
 | 
						|
{
 | 
						|
  destroy();
 | 
						|
}
 | 
						|
 | 
						|
void ObLogClient::destroy()
 | 
						|
{
 | 
						|
  if (IS_INIT) {
 | 
						|
    is_inited_ = false;
 | 
						|
    rpc_proxy_ = NULL;
 | 
						|
    palf_id_ = -1;
 | 
						|
    log_handler_.destroy();
 | 
						|
    log_size_ = -1;
 | 
						|
  }
 | 
						|
}
 | 
						|
 | 
						|
int ObLogClient::init(const common::ObAddr &self,
 | 
						|
                      const int64_t palf_id,
 | 
						|
                      obrpc::PalfClusterRpcProxy *rpc_proxy,
 | 
						|
                      LogService *log_service)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  if (OB_UNLIKELY(is_inited_)) {
 | 
						|
    ret = OB_INIT_TWICE;
 | 
						|
    CLOG_LOG(WARN, "ObLogClient has been inited", K(ret));
 | 
						|
  } else if (false == self.is_valid()|| OB_ISNULL(rpc_proxy) || palf_id < 0) {
 | 
						|
    ret = OB_INVALID_ARGUMENT;
 | 
						|
    CLOG_LOG(WARN, "invalid argument", K(ret), K(self), K(palf_id), K(rpc_proxy));
 | 
						|
  } else {
 | 
						|
    for (int i = 0; i < REMOTE_APPEND_CB_CNT; i++) {
 | 
						|
      if (OB_FAIL(remote_append_cb_list[i].init(rpc_proxy, self))) {
 | 
						|
        CLOG_LOG(WARN, "init", K(ret), K(self), K(palf_id), K(rpc_proxy));
 | 
						|
      }
 | 
						|
    }
 | 
						|
    self_ = self;
 | 
						|
    palf_id_ = palf_id;
 | 
						|
    rpc_proxy_ = rpc_proxy;
 | 
						|
    log_service_ = log_service;
 | 
						|
    is_inited_ = true;
 | 
						|
  }
 | 
						|
 | 
						|
  if ((OB_FAIL(ret)) && (OB_INIT_TWICE != ret)) {
 | 
						|
    destroy();
 | 
						|
  }
 | 
						|
  CLOG_LOG(INFO, "ObLogClient init finished", K(ret), K(self));
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int ObLogClient::create_palf_replica(const common::ObMemberList &member_list,
 | 
						|
                                     const int64_t replica_num,
 | 
						|
                                     const int64_t leader_idx)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  if(IS_NOT_INIT) {
 | 
						|
    ret = OB_NOT_INIT;
 | 
						|
    CLOG_LOG(ERROR, "ObLogClient has not been inited", K(ret));
 | 
						|
  } else if (false == member_list.is_valid() || replica_num <= 0) {
 | 
						|
    ret = OB_INVALID_ARGUMENT;
 | 
						|
  } else {
 | 
						|
    share::ObLSID ls_id(palf_id_);
 | 
						|
    share::ObTenantRole tenant_role(share::ObTenantRole::PRIMARY_TENANT);
 | 
						|
    common::ObAddr leader;
 | 
						|
    member_list.get_server_by_index(leader_idx, leader);
 | 
						|
 | 
						|
    LSN init_lsn(0);
 | 
						|
    common::ObILogAllocator *alloc_mgr = NULL;
 | 
						|
    palf::PalfBaseInfo palf_base_info;
 | 
						|
    palf_base_info.generate_by_default();
 | 
						|
    common::GlobalLearnerList learner_list;
 | 
						|
    if (OB_FAIL(log_service_->create_ls(ls_id, ObReplicaType::REPLICA_TYPE_FULL,
 | 
						|
        tenant_role, palf_base_info, true, log_handler_))) {
 | 
						|
      CLOG_LOG(WARN, "create_ls failed", K(ret), K_(palf_id));
 | 
						|
    } else if (OB_FAIL(log_handler_.set_initial_member_list(member_list, replica_num, learner_list))) {
 | 
						|
      CLOG_LOG(WARN, "set_initial_member_list failed", K(ret), K_(palf_id));
 | 
						|
    } else {
 | 
						|
      CLOG_LOG(ERROR, "create_palf_replica success", K(ret), K_(palf_id), K(member_list), K(replica_num), K(leader_idx));
 | 
						|
    }
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
static void *append_fn(void *arg)
 | 
						|
{
 | 
						|
  ObLogClient *client = reinterpret_cast<ObLogClient *>(arg);
 | 
						|
  client->do_submit();
 | 
						|
  return (void *)0;
 | 
						|
}
 | 
						|
 | 
						|
int ObLogClient::submit_append_log_task(const int64_t thread_num, const int64_t log_size)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  common::ObRole role;
 | 
						|
  int64_t unused_pid;
 | 
						|
  if (IS_NOT_INIT) {
 | 
						|
    ret = OB_NOT_INIT;
 | 
						|
  } else if (OB_FAIL(log_handler_.get_role(role, unused_pid))) {
 | 
						|
  } else if (role != LEADER) {
 | 
						|
    CLOG_LOG(ERROR, "client is not leader");
 | 
						|
  } else if (OB_FAIL(lock_.trylock())) {
 | 
						|
    CLOG_LOG(ERROR, "another client running", K(ret));
 | 
						|
  } else {
 | 
						|
    log_size_ = log_size;
 | 
						|
    client_number_ = thread_num;
 | 
						|
    worker_number_ = (client_number_ >= MAX_THREAD_NUM)? MAX_THREAD_NUM: client_number_;
 | 
						|
    pthread_t tids[MAX_THREAD_NUM];
 | 
						|
 | 
						|
    CLOG_LOG(ERROR, "start submit_log", K_(log_size), K(thread_num), K(worker_number_));
 | 
						|
    for (int64_t i = 0; i < worker_number_; i++) {
 | 
						|
      pthread_create(&tids[i], NULL, append_fn, this);
 | 
						|
    }
 | 
						|
 | 
						|
    for (int64_t i = 0; i < worker_number_; i++) {
 | 
						|
      pthread_join(tids[i], NULL);
 | 
						|
    }
 | 
						|
    lock_.unlock();
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int ObLogClient::do_submit()
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  const int64_t NBYTES = 40000;
 | 
						|
  char BUFFER[NBYTES];
 | 
						|
  memset(BUFFER, 'a', NBYTES);
 | 
						|
  const int64_t CB_ARRAY_NUM = (client_number_ >= worker_number_)? client_number_ / worker_number_: 1;
 | 
						|
  MockAppendCb *cb_array = new MockAppendCb[CB_ARRAY_NUM];
 | 
						|
  LSN lsn;
 | 
						|
  SCN log_scn;
 | 
						|
  while (true) {
 | 
						|
    for (int i = 0; i < CB_ARRAY_NUM; i++)
 | 
						|
    {
 | 
						|
      if (cb_array[i].is_called()) {
 | 
						|
        cb_array[i].reset();
 | 
						|
        const int64_t log_size = (log_size_ > 0)? log_size_: ObRandom::rand(100, 1024);
 | 
						|
        cb_array[i].log_size_ = log_size;
 | 
						|
        ret = log_handler_.append(BUFFER, log_size, SCN::min_scn(), true, &cb_array[i], lsn, log_scn);
 | 
						|
        if (OB_SUCCESS != ret) {
 | 
						|
          (void) cb_array[i].on_success();
 | 
						|
        }
 | 
						|
      }
 | 
						|
    }
 | 
						|
  }
 | 
						|
  while (true) {
 | 
						|
    sleep(10);
 | 
						|
  }
 | 
						|
  return ret;
 | 
						|
}
 | 
						|
 | 
						|
int MockAppendCb::on_success()
 | 
						|
{
 | 
						|
  ATOMIC_STORE(&is_called_, true);
 | 
						|
  return OB_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
int ObLogClient::submit_log(const common::ObAddr &client_addr, const int64_t client_id, const palf::LogWriteBuf &log_buf)
 | 
						|
{
 | 
						|
  int ret = OB_SUCCESS;
 | 
						|
  LSN lsn;
 | 
						|
  SCN log_scn;
 | 
						|
  const char *buf = log_buf.write_buf_[0].buf_;
 | 
						|
  const int64_t buf_len = log_buf.write_buf_[0].buf_len_;
 | 
						|
  if (IS_NOT_INIT) {
 | 
						|
    ret = OB_NOT_INIT;
 | 
						|
  } else if (OB_FAIL(remote_append_cb_list[client_id].pre_submit(client_addr, palf_id_, client_id))) {
 | 
						|
    // CLOG_LOG(WARN, "append_cb init failed", K(ret));
 | 
						|
  } else if (OB_FAIL(log_handler_.append(buf, buf_len, SCN::min_scn(), true, &(remote_append_cb_list[client_id]), lsn, log_scn))) {
 | 
						|
    CLOG_LOG(WARN, "append failed", K(ret));
 | 
						|
  }
 | 
						|
  return OB_SUCCESS;
 | 
						|
}
 | 
						|
 | 
						|
void LogRemoteClient::has_returned()
 | 
						|
{
 | 
						|
  const int64_t tmp_rt = common::ObTimeUtility::current_time() - last_submit_ts_;
 | 
						|
 | 
						|
  ATOMIC_STORE(&is_returned_, true);
 | 
						|
  ObThreadCondGuard guard(cond_);
 | 
						|
  cond_.signal();
 | 
						|
 | 
						|
  ATOMIC_INC(&c_append_cnt);
 | 
						|
  ATOMIC_FAA(&c_rt, tmp_rt);
 | 
						|
 | 
						|
  if (REACH_TIME_INTERVAL(1000 *1000)) {
 | 
						|
    int64_t l_append_cnt = ATOMIC_LOAD(&c_append_cnt);
 | 
						|
    if (l_append_cnt == 0) l_append_cnt = 1;
 | 
						|
    int64_t l_rt = ATOMIC_LOAD(&c_rt);
 | 
						|
 | 
						|
    ATOMIC_STORE(&c_append_cnt, 0);
 | 
						|
    ATOMIC_STORE(&c_rt, 0);
 | 
						|
 | 
						|
    CLOG_LOG_RET(ERROR, OB_SUCCESS, "result:", K(l_append_cnt), K(l_rt), "avg_rt", l_rt/l_append_cnt);
 | 
						|
  }
 | 
						|
}
 | 
						|
} // end namespace palfcluster
 | 
						|
} // end namespace oceanbase
 |