[NetStandby] [Patch From 4.2.1 BP3] NetStandby Fetching Log Performance Optimization
This commit is contained in:
committed by
ob-robot
parent
f70e4019e2
commit
1d68d4b1ee
@ -1133,34 +1133,22 @@ int FetchLogARpc::analyze_result_(RpcRequest &rpc_req,
|
||||
ERRSIM_POINT_DEF(ALLOC_FETCH_LOG_ARPC_CB_FAIL);
|
||||
rpc::frame::ObReqTransport::AsyncCB *FetchLogARpc::RpcCB::clone(const rpc::frame::SPAlloc &alloc) const
|
||||
{
|
||||
void *buf = NULL;
|
||||
RpcCB *cb = NULL;
|
||||
if (OB_SUCCESS != ALLOC_FETCH_LOG_ARPC_CB_FAIL) {
|
||||
LOG_ERROR_RET(ALLOC_FETCH_LOG_ARPC_CB_FAIL, "ALLOC_FETCH_LOG_ARPC_CB_FAIL");
|
||||
} else if (OB_ISNULL(buf = alloc(sizeof(RpcCB)))) {
|
||||
LOG_ERROR_RET(OB_ALLOCATE_MEMORY_FAILED, "clone rpc callback fail", K(buf), K(sizeof(RpcCB)));
|
||||
} else if (OB_ISNULL(cb = new(buf) RpcCB(host_))) {
|
||||
LOG_ERROR_RET(OB_ALLOCATE_MEMORY_FAILED, "construct RpcCB fail", K(buf));
|
||||
} else {
|
||||
// 成功
|
||||
}
|
||||
|
||||
return cb;
|
||||
return static_cast<rpc::frame::ObReqTransport::AsyncCB *>(const_cast<FetchLogARpc::RpcCB*>(this));
|
||||
}
|
||||
|
||||
int FetchLogARpc::RpcCB::process()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObCdcLSFetchLogResp &result = RpcCBBase::result_;
|
||||
ObRpcResultCode &rcode = RpcCBBase::rcode_;
|
||||
const common::ObAddr &svr = RpcCBBase::dst_;
|
||||
const ObRpcResultCode rcode = RpcCBBase::rcode_;
|
||||
const common::ObAddr svr = RpcCBBase::dst_;
|
||||
|
||||
if (OB_FAIL(do_process_(rcode, &result))) {
|
||||
LOG_ERROR("process fetch log callback fail", KR(ret), K(result), K(rcode), K(svr));
|
||||
LOG_ERROR("process fetch log callback fail", KR(ret), K(rcode), K(svr));
|
||||
}
|
||||
// Aone:
|
||||
// Note: Active destructe response after asynchronous RPC processing
|
||||
result.reset();
|
||||
// result.reset();
|
||||
|
||||
return ret;
|
||||
}
|
||||
@ -1169,7 +1157,7 @@ void FetchLogARpc::RpcCB::on_timeout()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObRpcResultCode rcode;
|
||||
const common::ObAddr &svr = RpcCBBase::dst_;
|
||||
const common::ObAddr svr = RpcCBBase::dst_;
|
||||
|
||||
rcode.rcode_ = OB_TIMEOUT;
|
||||
(void)snprintf(rcode.msg_, sizeof(rcode.msg_), "fetch log rpc timeout, svr=%s",
|
||||
@ -1184,7 +1172,7 @@ void FetchLogARpc::RpcCB::on_invalid()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObRpcResultCode rcode;
|
||||
const common::ObAddr &svr = RpcCBBase::dst_;
|
||||
const common::ObAddr svr = RpcCBBase::dst_;
|
||||
|
||||
// Invalid package encountered, decode failed
|
||||
rcode.rcode_ = OB_RPC_PACKET_INVALID;
|
||||
@ -1204,7 +1192,7 @@ int FetchLogARpc::RpcCB::do_process_(const ObRpcResultCode &rcode, const ObCdcLS
|
||||
FetchLogARpc &rpc_host = rpc_req.host_;
|
||||
|
||||
if (OB_FAIL(rpc_host.handle_rpc_response(rpc_req, rcode, resp))) {
|
||||
LOG_ERROR("set fetch log response fail", KR(ret), K(resp), K(rcode));
|
||||
LOG_ERROR("set fetch log response fail", KR(ret), K(rcode));
|
||||
} else {
|
||||
// success
|
||||
}
|
||||
|
||||
@ -297,7 +297,7 @@ private:
|
||||
{
|
||||
public:
|
||||
explicit RpcCB(RpcRequest &host) : host_(host) {}
|
||||
virtual ~RpcCB() {}
|
||||
virtual ~RpcCB() { result_.reset(); }
|
||||
|
||||
public:
|
||||
rpc::frame::ObReqTransport::AsyncCB *clone(const rpc::frame::SPAlloc &alloc) const;
|
||||
|
||||
@ -29,7 +29,7 @@ namespace logservice
|
||||
class ObLogService;
|
||||
class ObLogRestoreDriverBase
|
||||
{
|
||||
const int64_t FETCH_LOG_AHEAD_THRESHOLD_NS = 3 * 1000 * 1000 *1000L; // 3s
|
||||
const int64_t FETCH_LOG_AHEAD_THRESHOLD_NS = 6 * 1000 * 1000 *1000L; // 6s
|
||||
public:
|
||||
ObLogRestoreDriverBase();
|
||||
virtual ~ObLogRestoreDriverBase();
|
||||
|
||||
@ -474,7 +474,7 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
|
||||
const ObTxBufferNodeArray &source_data =
|
||||
commit_log.get_multi_source_data();
|
||||
const uint64_t exec_tenant_id = gen_meta_tenant_id(tenant_id_);
|
||||
START_TRANSACTION(proxy_, exec_tenant_id)
|
||||
ObMySQLTransaction trans;
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < source_data.count(); ++i) {
|
||||
const ObTxBufferNode &node = source_data.at(i);
|
||||
if (ObTxDataSourceType::STANDBY_UPGRADE == node.get_data_source_type()) {
|
||||
@ -484,6 +484,8 @@ int ObRecoveryLSService::process_ls_tx_log_(ObTxLogBlock &tx_log_block, const SC
|
||||
} else if (ObTxDataSourceType::LS_TABLE != node.get_data_source_type()
|
||||
&& ObTxDataSourceType::TRANSFER_TASK != node.get_data_source_type()) {
|
||||
// nothing
|
||||
} else if (! trans.is_started() && OB_FAIL(trans.start(proxy_, exec_tenant_id))) {
|
||||
LOG_WARN("failed to start trans", KR(ret), K(exec_tenant_id));
|
||||
} else if (FALSE_IT(has_operation = true)) {
|
||||
//can not be there;
|
||||
} else if (OB_FAIL(check_valid_to_operator_ls_(sync_scn))) {
|
||||
|
||||
Reference in New Issue
Block a user