314 lines
8.0 KiB
C++
314 lines
8.0 KiB
C++
// Copyright (c) 2021 OceanBase
|
|
// OceanBase is licensed under Mulan PubL v2.
|
|
// You can use this software according to the terms and conditions of the Mulan PubL v2.
|
|
// You may obtain a copy of Mulan PubL v2 at:
|
|
// http://license.coscl.org.cn/MulanPubL-2.0
|
|
// THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND,
|
|
// EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT,
|
|
// MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE.
|
|
// See the Mulan PubL v2 for more details.
|
|
|
|
#ifndef OCEANBASE_PALF_CLUSTER_LOG_CLIENT_H_
|
|
#define OCEANBASE_PALF_CLUSTER_LOG_CLIENT_H_
|
|
|
|
#include "share/ob_thread_pool.h"
|
|
#include "share/ob_occam_timer.h"
|
|
#include "lib/hash/ob_linear_hash_map.h" // ObLinearHashMap
|
|
#include "lib/lock/ob_tc_rwlock.h" // RWLock
|
|
#include "common/ob_member_list.h" // common::ObMemberList
|
|
#include "storage/ob_locality_manager.h" // ObLocalityManager
|
|
#include "logservice/palf/palf_handle_impl.h"
|
|
#include "logservice/ob_log_handler.h"
|
|
#include "logservice/ob_log_service.h"
|
|
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_req.h" // ProbeMsg
|
|
#include "mittest/palf_cluster/rpc/palf_cluster_rpc_proxy.h" // ProbeMsg
|
|
|
|
namespace oceanbase
|
|
{
|
|
|
|
namespace palf
|
|
{
|
|
class PalfEnv;
|
|
}
|
|
|
|
namespace obrpc
|
|
{
|
|
class PalfClusterRpcProxy;
|
|
}
|
|
|
|
namespace palfcluster
|
|
{
|
|
class LogService;
|
|
|
|
class MockAppendCb : public logservice::AppendCb
|
|
{
|
|
public:
|
|
MockAppendCb()
|
|
: log_size_(0),
|
|
is_called_(true)
|
|
{ }
|
|
|
|
int on_success() override final;
|
|
|
|
int on_failure() override final
|
|
{
|
|
ATOMIC_STORE(&is_called_, true);
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
void reset()
|
|
{
|
|
ATOMIC_STORE(&is_called_, false);
|
|
}
|
|
|
|
bool is_called() const
|
|
{
|
|
return ATOMIC_LOAD(&is_called_);
|
|
}
|
|
|
|
public:
|
|
int64_t log_size_;
|
|
bool is_called_;
|
|
};
|
|
|
|
class MockRemoteAppendCb : public logservice::AppendCb
|
|
{
|
|
public:
|
|
MockRemoteAppendCb()
|
|
: rpc_proxy_(NULL),
|
|
client_addr_(),
|
|
is_called_(true)
|
|
{ }
|
|
|
|
~MockRemoteAppendCb()
|
|
{
|
|
rpc_proxy_ = NULL;
|
|
client_addr_.reset();
|
|
}
|
|
|
|
int init(obrpc::PalfClusterRpcProxy *rpc_proxy,
|
|
const common::ObAddr &self)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (rpc_proxy == NULL) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else {
|
|
rpc_proxy_ = rpc_proxy;
|
|
self_ = self;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int pre_submit(const common::ObAddr &client_addr,
|
|
const int64_t palf_id,
|
|
const int64_t client_id)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
if (is_called()) {
|
|
reset();
|
|
client_addr_ = client_addr;
|
|
palf_id_ = palf_id;
|
|
client_id_ = client_id;
|
|
} else {
|
|
ret = OB_EAGAIN;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
int on_success() override final
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
ATOMIC_STORE(&is_called_, true);
|
|
AppendCb::reset();
|
|
if (OB_NOT_NULL(rpc_proxy_) && client_addr_.is_valid()) {
|
|
// notify called
|
|
const int64_t RPC_TIMEOUT_US = 1 * 1000 * 1000;
|
|
SubmitLogCmdResp resp(self_, palf_id_, client_id_);
|
|
static obrpc::ObLogRpcCB<obrpc::OB_LOG_SUBMIT_LOG_CMD_RESP> cb; \
|
|
if (OB_FAIL(rpc_proxy_->to(client_addr_).timeout(RPC_TIMEOUT_US).trace_time(true).
|
|
max_process_handler_time(RPC_TIMEOUT_US).by(MTL_ID()).send_submit_log_resp(resp, &cb))) {
|
|
CLOG_LOG(ERROR, "send_submit_log_resp failed", KR(ret), K(resp));
|
|
}
|
|
}
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
int on_failure() override final
|
|
{
|
|
ATOMIC_STORE(&is_called_, true);
|
|
if (OB_NOT_NULL(rpc_proxy_) && client_addr_.is_valid()) {
|
|
//notify called
|
|
}
|
|
return OB_SUCCESS;
|
|
}
|
|
|
|
void reset()
|
|
{
|
|
ATOMIC_STORE(&is_called_, false);
|
|
}
|
|
|
|
bool is_called() const
|
|
{
|
|
return ATOMIC_LOAD(&is_called_);
|
|
}
|
|
|
|
obrpc::PalfClusterRpcProxy *rpc_proxy_;
|
|
common::ObAddr client_addr_;
|
|
common::ObAddr self_;
|
|
int64_t palf_id_;
|
|
int64_t client_id_;
|
|
bool is_called_;
|
|
};
|
|
|
|
class LogRemoteClient
|
|
{
|
|
public:
|
|
LogRemoteClient()
|
|
: thread_(),
|
|
th_id_(0),
|
|
log_size_(0),
|
|
rpc_proxy_(NULL),
|
|
self_(),
|
|
dst_(),
|
|
palf_id_(0),
|
|
cond_(),
|
|
is_returned_(true),
|
|
last_submit_ts_(0),
|
|
avg_rt_(-1),
|
|
is_inited_(false) {}
|
|
|
|
int init_and_create(const int64_t th_id,
|
|
const int64_t log_size,
|
|
obrpc::PalfClusterRpcProxy *rpc_proxy,
|
|
const common::ObAddr &self,
|
|
const common::ObAddr &dst,
|
|
const int64_t palf_id)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
cond_.init(ObWaitEventIds::REBALANCE_TASK_MGR_COND_WAIT);
|
|
if (IS_INIT) {
|
|
ret = OB_INIT_TWICE;
|
|
} else if (th_id < 0 || log_size <= 0 ||
|
|
rpc_proxy == NULL || false == self.is_valid()) {
|
|
ret = OB_INVALID_ARGUMENT;
|
|
} else if (0 != pthread_create(&thread_, NULL, do_submit, this)){
|
|
PALF_LOG(ERROR, "create thread fail", K(thread_));
|
|
} else {
|
|
th_id_ = th_id;
|
|
log_size_ = log_size;
|
|
rpc_proxy_ = rpc_proxy;
|
|
self_ = self;
|
|
dst_ = dst;
|
|
palf_id_ = palf_id;
|
|
}
|
|
return ret;
|
|
}
|
|
|
|
void join()
|
|
{
|
|
pthread_join(thread_, NULL);
|
|
}
|
|
|
|
static void* do_submit(void *arg)
|
|
{
|
|
int ret = OB_SUCCESS;
|
|
const int64_t NBYTES = 40000;
|
|
const int64_t RPC_TIMEOUT_US = 1 * 1000 * 1000;
|
|
char BUFFER[NBYTES];
|
|
memset(BUFFER, 'a', NBYTES);
|
|
|
|
LogRemoteClient *client = static_cast<LogRemoteClient *>(arg);
|
|
palf::LogWriteBuf write_buf;
|
|
write_buf.push_back(BUFFER, client->log_size_);
|
|
|
|
SubmitLogCmd req(client->self_, client->palf_id_, client->th_id_, write_buf);
|
|
|
|
while (true) {
|
|
// const bool is_timeout = (common::ObTimeUtility::current_time() - client->last_submit_ts_) > 500 * 1000;
|
|
// if (client->can_submit())
|
|
static obrpc::ObLogRpcCB<obrpc::OB_LOG_SUBMIT_LOG_CMD> cb;
|
|
if (OB_FAIL(client->rpc_proxy_->to(client->dst_).timeout(RPC_TIMEOUT_US).trace_time(true). \
|
|
max_process_handler_time(RPC_TIMEOUT_US).by(MTL_ID()).send_submit_log_cmd(req, &cb))) {
|
|
PALF_LOG(WARN, "send_submit_log_cmd fail", K(req));
|
|
} else {
|
|
client->has_submit();
|
|
ObThreadCondGuard guard(client->cond_);
|
|
client->cond_.wait();
|
|
}
|
|
}
|
|
return NULL;
|
|
}
|
|
|
|
bool can_submit() const
|
|
{
|
|
return ATOMIC_LOAD(&is_returned_);
|
|
}
|
|
|
|
void has_submit()
|
|
{
|
|
ATOMIC_STORE(&is_returned_, false);
|
|
last_submit_ts_ = common::ObTimeUtility::current_time();
|
|
}
|
|
|
|
void has_returned();
|
|
|
|
public:
|
|
pthread_t thread_;
|
|
int64_t th_id_;
|
|
int64_t log_size_;
|
|
obrpc::PalfClusterRpcProxy *rpc_proxy_;
|
|
common::ObAddr self_;
|
|
common::ObAddr dst_;
|
|
int64_t palf_id_;
|
|
mutable common::ObThreadCond cond_;
|
|
bool is_returned_;
|
|
int64_t last_submit_ts_;
|
|
int64_t avg_rt_;
|
|
bool is_inited_;
|
|
};
|
|
|
|
class ObLogClient
|
|
{
|
|
public:
|
|
ObLogClient();
|
|
virtual ~ObLogClient();
|
|
int init(const common::ObAddr &self,
|
|
const int64_t palf_id,
|
|
obrpc::PalfClusterRpcProxy *rpc_proxy,
|
|
palfcluster::LogService *log_service);
|
|
void destroy();
|
|
int create_palf_replica(const common::ObMemberList &member_list,
|
|
const int64_t replica_num,
|
|
const int64_t leader_idx);
|
|
int submit_append_log_task(const int64_t thread_num, const int64_t log_size);
|
|
int submit_log(const common::ObAddr &client_addr, const int64_t client_id, const palf::LogWriteBuf &log_buf);
|
|
int do_submit();
|
|
|
|
share::ObLSID get_ls_id() const { return share::ObLSID(palf_id_); }
|
|
logservice::ObLogHandler *get_log_handler() { return &log_handler_;}
|
|
TO_STRING_KV(K_(palf_id));
|
|
private:
|
|
static const int64_t MAX_THREAD_NUM = 10;
|
|
static const int64_t REMOTE_APPEND_CB_CNT = 4000;
|
|
MockRemoteAppendCb remote_append_cb_list[REMOTE_APPEND_CB_CNT];
|
|
common::ObAddr self_;
|
|
int64_t palf_id_;
|
|
obrpc::PalfClusterRpcProxy *rpc_proxy_;
|
|
logservice::ObLogHandler log_handler_;
|
|
common::ObSpinLock lock_;
|
|
int64_t log_size_;
|
|
logservice::coordinator::ElectionPriorityImpl election_priority_;
|
|
int64_t total_num_;
|
|
palfcluster::LogService *log_service_;
|
|
int64_t client_number_;
|
|
int64_t worker_number_;
|
|
bool is_inited_;
|
|
};
|
|
|
|
|
|
typedef common::ObLinearHashMap<share::ObLSID, palfcluster::ObLogClient*> LogClientMap;
|
|
|
|
} // palfcluster
|
|
} // oceanbase
|
|
|
|
#endif |