Files
oceanbase/mittest/palf_cluster/logservice/ob_log_client.cpp
2023-09-27 08:43:51 +00:00

237 lines
7.2 KiB
C++

// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#define USING_LOG_PREFIX CLOG
#include "ob_log_client.h"
#include "logservice/palf/palf_env.h"
#include "logservice/palf/palf_handle.h"
#include "lib/utility/ob_macro_utils.h"
#include "lib/thread/ob_thread_name.h" // set_thread_name
#include "lib/function/ob_function.h" // ObFunction
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_proxy.h" // RpcProxy
#include "mittest/palf_cluster/logservice/log_service.h" // LogService
namespace oceanbase
{
using namespace common;
using namespace palf;
namespace palfcluster
{
int64_t c_append_cnt = 0;
int64_t c_rt = 0;
ObLogClient::ObLogClient()
: self_(),
palf_id_(-1),
rpc_proxy_(NULL),
log_handler_(),
lock_(),
log_size_(-1),
election_priority_(),
total_num_(0),
is_inited_(false)
{}
ObLogClient::~ObLogClient()
{
destroy();
}
void ObLogClient::destroy()
{
if (IS_INIT) {
is_inited_ = false;
rpc_proxy_ = NULL;
palf_id_ = -1;
log_handler_.destroy();
log_size_ = -1;
}
}
int ObLogClient::init(const common::ObAddr &self,
const int64_t palf_id,
obrpc::PalfClusterRpcProxy *rpc_proxy,
LogService *log_service)
{
int ret = OB_SUCCESS;
if (OB_UNLIKELY(is_inited_)) {
ret = OB_INIT_TWICE;
CLOG_LOG(WARN, "ObLogClient has been inited", K(ret));
} else if (false == self.is_valid()|| OB_ISNULL(rpc_proxy) || palf_id < 0) {
ret = OB_INVALID_ARGUMENT;
CLOG_LOG(WARN, "invalid argument", K(ret), K(self), K(palf_id), K(rpc_proxy));
} else {
for (int i = 0; i < REMOTE_APPEND_CB_CNT; i++) {
if (OB_FAIL(remote_append_cb_list[i].init(rpc_proxy, self))) {
CLOG_LOG(WARN, "init", K(ret), K(self), K(palf_id), K(rpc_proxy));
}
}
self_ = self;
palf_id_ = palf_id;
rpc_proxy_ = rpc_proxy;
log_service_ = log_service;
is_inited_ = true;
}
if ((OB_FAIL(ret)) && (OB_INIT_TWICE != ret)) {
destroy();
}
CLOG_LOG(INFO, "ObLogClient init finished", K(ret), K(self));
return ret;
}
int ObLogClient::create_palf_replica(const common::ObMemberList &member_list,
const int64_t replica_num,
const int64_t leader_idx)
{
int ret = OB_SUCCESS;
if(IS_NOT_INIT) {
ret = OB_NOT_INIT;
CLOG_LOG(ERROR, "ObLogClient has not been inited", K(ret));
} else if (false == member_list.is_valid() || replica_num <= 0) {
ret = OB_INVALID_ARGUMENT;
} else {
share::ObLSID ls_id(palf_id_);
share::ObTenantRole tenant_role(share::ObTenantRole::PRIMARY_TENANT);
common::ObAddr leader;
member_list.get_server_by_index(leader_idx, leader);
LSN init_lsn(0);
common::ObILogAllocator *alloc_mgr = NULL;
palf::PalfBaseInfo palf_base_info;
palf_base_info.generate_by_default();
common::GlobalLearnerList learner_list;
if (OB_FAIL(log_service_->create_ls(ls_id, ObReplicaType::REPLICA_TYPE_FULL,
tenant_role, palf_base_info, true, log_handler_))) {
CLOG_LOG(WARN, "create_ls failed", K(ret), K_(palf_id));
} else if (OB_FAIL(log_handler_.set_initial_member_list(member_list, replica_num, learner_list))) {
CLOG_LOG(WARN, "set_initial_member_list failed", K(ret), K_(palf_id));
} else {
CLOG_LOG(ERROR, "create_palf_replica success", K(ret), K_(palf_id), K(member_list), K(replica_num), K(leader_idx));
}
}
return ret;
}
static void *append_fn(void *arg)
{
ObLogClient *client = reinterpret_cast<ObLogClient *>(arg);
client->do_submit();
return (void *)0;
}
int ObLogClient::submit_append_log_task(const int64_t thread_num, const int64_t log_size)
{
int ret = OB_SUCCESS;
common::ObRole role;
int64_t unused_pid;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(log_handler_.get_role(role, unused_pid))) {
} else if (role != LEADER) {
CLOG_LOG(ERROR, "client is not leader");
} else if (OB_FAIL(lock_.trylock())) {
CLOG_LOG(ERROR, "another client running", K(ret));
} else {
log_size_ = log_size;
client_number_ = thread_num;
worker_number_ = (client_number_ >= MAX_THREAD_NUM)? MAX_THREAD_NUM: client_number_;
pthread_t tids[MAX_THREAD_NUM];
CLOG_LOG(ERROR, "start submit_log", K_(log_size), K(thread_num), K(worker_number_));
for (int64_t i = 0; i < worker_number_; i++) {
pthread_create(&tids[i], NULL, append_fn, this);
}
for (int64_t i = 0; i < worker_number_; i++) {
pthread_join(tids[i], NULL);
}
lock_.unlock();
}
return ret;
}
int ObLogClient::do_submit()
{
int ret = OB_SUCCESS;
const int64_t NBYTES = 40000;
char BUFFER[NBYTES];
memset(BUFFER, 'a', NBYTES);
const int64_t CB_ARRAY_NUM = (client_number_ >= worker_number_)? client_number_ / worker_number_: 1;
MockAppendCb *cb_array = new MockAppendCb[CB_ARRAY_NUM];
LSN lsn;
SCN log_scn;
while (true) {
for (int i = 0; i < CB_ARRAY_NUM; i++)
{
if (cb_array[i].is_called()) {
cb_array[i].reset();
const int64_t log_size = (log_size_ > 0)? log_size_: ObRandom::rand(100, 1024);
cb_array[i].log_size_ = log_size;
ret = log_handler_.append(BUFFER, log_size, SCN::min_scn(), true, &cb_array[i], lsn, log_scn);
if (OB_SUCCESS != ret) {
(void) cb_array[i].on_success();
}
}
}
}
while (true) {
sleep(10);
}
return ret;
}
int MockAppendCb::on_success()
{
ATOMIC_STORE(&is_called_, true);
return OB_SUCCESS;
}
int ObLogClient::submit_log(const common::ObAddr &client_addr, const int64_t client_id, const palf::LogWriteBuf &log_buf)
{
int ret = OB_SUCCESS;
LSN lsn;
SCN log_scn;
const char *buf = log_buf.write_buf_[0].buf_;
const int64_t buf_len = log_buf.write_buf_[0].buf_len_;
if (IS_NOT_INIT) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(remote_append_cb_list[client_id].pre_submit(client_addr, palf_id_, client_id))) {
// CLOG_LOG(WARN, "append_cb init failed", K(ret));
} else if (OB_FAIL(log_handler_.append(buf, buf_len, SCN::min_scn(), true, &(remote_append_cb_list[client_id]), lsn, log_scn))) {
CLOG_LOG(WARN, "append failed", K(ret));
}
return OB_SUCCESS;
}
void LogRemoteClient::has_returned()
{
const int64_t tmp_rt = common::ObTimeUtility::current_time() - last_submit_ts_;
ATOMIC_STORE(&is_returned_, true);
ObThreadCondGuard guard(cond_);
cond_.signal();
ATOMIC_INC(&c_append_cnt);
ATOMIC_FAA(&c_rt, tmp_rt);
if (REACH_TIME_INTERVAL(1000 *1000)) {
int64_t l_append_cnt = ATOMIC_LOAD(&c_append_cnt);
if (l_append_cnt == 0) l_append_cnt = 1;
int64_t l_rt = ATOMIC_LOAD(&c_rt);
ATOMIC_STORE(&c_append_cnt, 0);
ATOMIC_STORE(&c_rt, 0);
CLOG_LOG_RET(ERROR, OB_SUCCESS, "result:", K(l_append_cnt), K(l_rt), "avg_rt", l_rt/l_append_cnt);
}
}
} // end namespace palfcluster
} // end namespace oceanbase