[CP] add lock for net_endpoint_ingress service
This commit is contained in:
@ -57,6 +57,7 @@ ObSrvNetworkFrame::ObSrvNetworkFrame(ObGlobalContext &gctx)
|
|||||||
mysql_transport_(NULL),
|
mysql_transport_(NULL),
|
||||||
batch_rpc_transport_(NULL),
|
batch_rpc_transport_(NULL),
|
||||||
last_ssl_info_hash_(UINT64_MAX),
|
last_ssl_info_hash_(UINT64_MAX),
|
||||||
|
lock_(),
|
||||||
standby_fetchlog_bw_limit_(0),
|
standby_fetchlog_bw_limit_(0),
|
||||||
standby_fetchlog_bytes_(0),
|
standby_fetchlog_bytes_(0),
|
||||||
standby_fetchlog_time_(0)
|
standby_fetchlog_time_(0)
|
||||||
@ -787,16 +788,22 @@ int ObSrvNetworkFrame::net_endpoint_register(const ObNetEndpointKey &endpoint_ke
|
|||||||
int ObSrvNetworkFrame::net_endpoint_predict_ingress(const ObNetEndpointKey &endpoint_key, int64_t &predicted_bw)
|
int ObSrvNetworkFrame::net_endpoint_predict_ingress(const ObNetEndpointKey &endpoint_key, int64_t &predicted_bw)
|
||||||
{
|
{
|
||||||
int ret = OB_SUCCESS;
|
int ret = OB_SUCCESS;
|
||||||
|
ObSpinLockGuard guard(lock_);
|
||||||
int64_t current_time = ObTimeUtility::current_time();
|
int64_t current_time = ObTimeUtility::current_time();
|
||||||
uint64_t current_fetchlog_bytes = pn_get_rxbytes(obrpc::ObPocRpcServer::RATELIMIT_PNIO_GROUP);
|
uint64_t current_fetchlog_bytes = pn_get_rxbytes(obrpc::ObPocRpcServer::RATELIMIT_PNIO_GROUP);
|
||||||
uint64_t peroid_bytes = current_fetchlog_bytes - standby_fetchlog_bytes_;
|
uint64_t peroid_bytes = current_fetchlog_bytes - standby_fetchlog_bytes_;
|
||||||
int64_t real_bw = peroid_bytes * 1000000L / (current_time - standby_fetchlog_time_);
|
uint64_t peroid_time = current_time - standby_fetchlog_time_;
|
||||||
|
if (0 >= peroid_time) {
|
||||||
|
ret = OB_ERR_SYS;
|
||||||
|
LOG_WARN("peroid_time is not larger than 0", K(ret), K(endpoint_key), K(peroid_time));
|
||||||
|
} else {
|
||||||
|
int64_t real_bw = peroid_bytes * 1000000L / peroid_time;
|
||||||
if (real_bw <= standby_fetchlog_bw_limit_) {
|
if (real_bw <= standby_fetchlog_bw_limit_) {
|
||||||
predicted_bw = (uint64_t)(real_bw + max(real_bw / 10, 1024 * 1024L));
|
predicted_bw = (uint64_t)(real_bw + max(real_bw / 10, 1024 * 1024L));
|
||||||
} else {
|
} else {
|
||||||
predicted_bw = (uint64_t)(real_bw + real_bw / 2);
|
predicted_bw = (uint64_t)(real_bw + real_bw / 2);
|
||||||
}
|
}
|
||||||
|
}
|
||||||
standby_fetchlog_time_ = current_time;
|
standby_fetchlog_time_ = current_time;
|
||||||
standby_fetchlog_bytes_ = current_fetchlog_bytes;
|
standby_fetchlog_bytes_ = current_fetchlog_bytes;
|
||||||
return ret;
|
return ret;
|
||||||
@ -816,6 +823,7 @@ int ObSrvNetworkFrame::net_endpoint_set_ingress(const ObNetEndpointKey &endpoint
|
|||||||
ret = OB_INVALID_CONFIG;
|
ret = OB_INVALID_CONFIG;
|
||||||
LOG_WARN("assigned bandwidtth is invalid", K(ret), K(endpoint_key), K(assigned_bw));
|
LOG_WARN("assigned bandwidtth is invalid", K(ret), K(endpoint_key), K(assigned_bw));
|
||||||
} else {
|
} else {
|
||||||
|
ObSpinLockGuard guard(lock_);
|
||||||
standby_fetchlog_bw_limit_ = assigned_bw;
|
standby_fetchlog_bw_limit_ = assigned_bw;
|
||||||
standby_fetchlog_time_ = ObTimeUtility::current_time();
|
standby_fetchlog_time_ = ObTimeUtility::current_time();
|
||||||
standby_fetchlog_bytes_ = pn_get_rxbytes(obrpc::ObPocRpcServer::RATELIMIT_PNIO_GROUP);
|
standby_fetchlog_bytes_ = pn_get_rxbytes(obrpc::ObPocRpcServer::RATELIMIT_PNIO_GROUP);
|
||||||
|
@ -103,6 +103,7 @@ private:
|
|||||||
rpc::frame::ObReqTransport *mysql_transport_;
|
rpc::frame::ObReqTransport *mysql_transport_;
|
||||||
rpc::frame::ObReqTransport *batch_rpc_transport_;
|
rpc::frame::ObReqTransport *batch_rpc_transport_;
|
||||||
uint64_t last_ssl_info_hash_;
|
uint64_t last_ssl_info_hash_;
|
||||||
|
ObSpinLock lock_;
|
||||||
int64_t standby_fetchlog_bw_limit_;
|
int64_t standby_fetchlog_bw_limit_;
|
||||||
uint64_t standby_fetchlog_bytes_;
|
uint64_t standby_fetchlog_bytes_;
|
||||||
int64_t standby_fetchlog_time_;
|
int64_t standby_fetchlog_time_;
|
||||||
|
Reference in New Issue
Block a user