Files
oceanbase/mittest/palf_cluster/logservice/ob_log_client.h
2023-09-27 08:43:51 +00:00

314 lines
8.0 KiB
C++

// Copyright (c) 2021 OceanBase
// OceanBase is licensed under Mulan PubL v2.
// You can use this software according to the terms and conditions of the Mulan PubL v2.
// You may obtain a copy of Mulan PubL v2 at:
// http://license.coscl.org.cn/MulanPubL-2.0
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
// See the Mulan PubL v2 for more details.
#ifndef OCEANBASE_PALF_CLUSTER_LOG_CLIENT_H_
#define OCEANBASE_PALF_CLUSTER_LOG_CLIENT_H_
#include "share/ob_thread_pool.h"
#include "share/ob_occam_timer.h"
#include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap
#include "lib/lock/ob_tc_rwlock.h" // RWLock
#include "common/ob_member_list.h" // common::ObMemberList
#include "storage/ob_locality_manager.h" // ObLocalityManager
#include "logservice/palf/palf_handle_impl.h"
#include "logservice/ob_log_handler.h"
#include "logservice/ob_log_service.h"
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_req.h" // ProbeMsg
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_proxy.h" // ProbeMsg
namespace oceanbase
{
namespace palf
{
class PalfEnv;
}
namespace obrpc
{
class PalfClusterRpcProxy;
}
namespace palfcluster
{
class LogService;
class MockAppendCb : public logservice::AppendCb
{
public:
MockAppendCb()
: log_size_(0),
is_called_(true)
{ }
int on_success() override final;
int on_failure() override final
{
ATOMIC_STORE(&is_called_, true);
return OB_SUCCESS;
}
void reset()
{
ATOMIC_STORE(&is_called_, false);
}
bool is_called() const
{
return ATOMIC_LOAD(&is_called_);
}
public:
int64_t log_size_;
bool is_called_;
};
class MockRemoteAppendCb : public logservice::AppendCb
{
public:
MockRemoteAppendCb()
: rpc_proxy_(NULL),
client_addr_(),
is_called_(true)
{ }
~MockRemoteAppendCb()
{
rpc_proxy_ = NULL;
client_addr_.reset();
}
int init(obrpc::PalfClusterRpcProxy *rpc_proxy,
const common::ObAddr &self)
{
int ret = OB_SUCCESS;
if (rpc_proxy == NULL) {
ret = OB_INVALID_ARGUMENT;
} else {
rpc_proxy_ = rpc_proxy;
self_ = self;
}
return ret;
}
int pre_submit(const common::ObAddr &client_addr,
const int64_t palf_id,
const int64_t client_id)
{
int ret = OB_SUCCESS;
if (is_called()) {
reset();
client_addr_ = client_addr;
palf_id_ = palf_id;
client_id_ = client_id;
} else {
ret = OB_EAGAIN;
}
return ret;
}
int on_success() override final
{
int ret = OB_SUCCESS;
ATOMIC_STORE(&is_called_, true);
AppendCb::reset();
if (OB_NOT_NULL(rpc_proxy_) && client_addr_.is_valid()) {
// notify called
const int64_t RPC_TIMEOUT_US = 1 * 1000 * 1000;
SubmitLogCmdResp resp(self_, palf_id_, client_id_);
static obrpc::ObLogRpcCB<obrpc::OB_LOG_SUBMIT_LOG_CMD_RESP> cb; \
if (OB_FAIL(rpc_proxy_->to(client_addr_).timeout(RPC_TIMEOUT_US).trace_time(true).
max_process_handler_time(RPC_TIMEOUT_US).by(MTL_ID()).send_submit_log_resp(resp, &cb))) {
CLOG_LOG(ERROR, "send_submit_log_resp failed", KR(ret), K(resp));
}
}
return OB_SUCCESS;
}
int on_failure() override final
{
ATOMIC_STORE(&is_called_, true);
if (OB_NOT_NULL(rpc_proxy_) && client_addr_.is_valid()) {
//notify called
}
return OB_SUCCESS;
}
void reset()
{
ATOMIC_STORE(&is_called_, false);
}
bool is_called() const
{
return ATOMIC_LOAD(&is_called_);
}
obrpc::PalfClusterRpcProxy *rpc_proxy_;
common::ObAddr client_addr_;
common::ObAddr self_;
int64_t palf_id_;
int64_t client_id_;
bool is_called_;
};
class LogRemoteClient
{
public:
LogRemoteClient()
: thread_(),
th_id_(0),
log_size_(0),
rpc_proxy_(NULL),
self_(),
dst_(),
palf_id_(0),
cond_(),
is_returned_(true),
last_submit_ts_(0),
avg_rt_(-1),
is_inited_(false) {}
int init_and_create(const int64_t th_id,
const int64_t log_size,
obrpc::PalfClusterRpcProxy *rpc_proxy,
const common::ObAddr &self,
const common::ObAddr &dst,
const int64_t palf_id)
{
int ret = OB_SUCCESS;
cond_.init(ObWaitEventIds::REBALANCE_TASK_MGR_COND_WAIT);
if (IS_INIT) {
ret = OB_INIT_TWICE;
} else if (th_id < 0 || log_size <= 0 ||
rpc_proxy == NULL || false == self.is_valid()) {
ret = OB_INVALID_ARGUMENT;
} else if (0 != pthread_create(&thread_, NULL, do_submit, this)){
PALF_LOG(ERROR, "create thread fail", K(thread_));
} else {
th_id_ = th_id;
log_size_ = log_size;
rpc_proxy_ = rpc_proxy;
self_ = self;
dst_ = dst;
palf_id_ = palf_id;
}
return ret;
}
void join()
{
pthread_join(thread_, NULL);
}
static void* do_submit(void *arg)
{
int ret = OB_SUCCESS;
const int64_t NBYTES = 40000;
const int64_t RPC_TIMEOUT_US = 1 * 1000 * 1000;
char BUFFER[NBYTES];
memset(BUFFER, 'a', NBYTES);
LogRemoteClient *client = static_cast<LogRemoteClient *>(arg);
palf::LogWriteBuf write_buf;
write_buf.push_back(BUFFER, client->log_size_);
SubmitLogCmd req(client->self_, client->palf_id_, client->th_id_, write_buf);
while (true) {
// const bool is_timeout = (common::ObTimeUtility::current_time() - client->last_submit_ts_) > 500 * 1000;
// if (client->can_submit())
static obrpc::ObLogRpcCB<obrpc::OB_LOG_SUBMIT_LOG_CMD> cb;
if (OB_FAIL(client->rpc_proxy_->to(client->dst_).timeout(RPC_TIMEOUT_US).trace_time(true). \
max_process_handler_time(RPC_TIMEOUT_US).by(MTL_ID()).send_submit_log_cmd(req, &cb))) {
PALF_LOG(WARN, "send_submit_log_cmd fail", K(req));
} else {
client->has_submit();
ObThreadCondGuard guard(client->cond_);
client->cond_.wait();
}
}
return NULL;
}
bool can_submit() const
{
return ATOMIC_LOAD(&is_returned_);
}
void has_submit()
{
ATOMIC_STORE(&is_returned_, false);
last_submit_ts_ = common::ObTimeUtility::current_time();
}
void has_returned();
public:
pthread_t thread_;
int64_t th_id_;
int64_t log_size_;
obrpc::PalfClusterRpcProxy *rpc_proxy_;
common::ObAddr self_;
common::ObAddr dst_;
int64_t palf_id_;
mutable common::ObThreadCond cond_;
bool is_returned_;
int64_t last_submit_ts_;
int64_t avg_rt_;
bool is_inited_;
};
class ObLogClient
{
public:
ObLogClient();
virtual ~ObLogClient();
int init(const common::ObAddr &self,
const int64_t palf_id,
obrpc::PalfClusterRpcProxy *rpc_proxy,
palfcluster::LogService *log_service);
void destroy();
int create_palf_replica(const common::ObMemberList &member_list,
const int64_t replica_num,
const int64_t leader_idx);
int submit_append_log_task(const int64_t thread_num, const int64_t log_size);
int submit_log(const common::ObAddr &client_addr, const int64_t client_id, const palf::LogWriteBuf &log_buf);
int do_submit();
share::ObLSID get_ls_id() const { return share::ObLSID(palf_id_); }
logservice::ObLogHandler *get_log_handler() { return &log_handler_;}
TO_STRING_KV(K_(palf_id));
private:
static const int64_t MAX_THREAD_NUM = 10;
static const int64_t REMOTE_APPEND_CB_CNT = 4000;
MockRemoteAppendCb remote_append_cb_list[REMOTE_APPEND_CB_CNT];
common::ObAddr self_;
int64_t palf_id_;
obrpc::PalfClusterRpcProxy *rpc_proxy_;
logservice::ObLogHandler log_handler_;
common::ObSpinLock lock_;
int64_t log_size_;
logservice::coordinator::ElectionPriorityImpl election_priority_;
int64_t total_num_;
palfcluster::LogService *log_service_;
int64_t client_number_;
int64_t worker_number_;
bool is_inited_;
};
typedef common::ObLinearHashMap<share::ObLSID, palfcluster::ObLogClient*> LogClientMap;
} // palfcluster
} // oceanbase
#endif