[CP] Fix performance reduction and reduce the log frequency for wrs and hotspot update
This commit is contained in:
@ -1049,6 +1049,7 @@ int ObServer::start()
|
|||||||
const int64_t start_ts = ObTimeUtility::current_time();
|
const int64_t start_ts = ObTimeUtility::current_time();
|
||||||
int64_t schema_refreshed_ts = 0;
|
int64_t schema_refreshed_ts = 0;
|
||||||
const int64_t expire_time = start_ts + MAX_CHECK_TIME;
|
const int64_t expire_time = start_ts + MAX_CHECK_TIME;
|
||||||
|
tenant_ids.set_max_print_count(512);
|
||||||
|
|
||||||
if (OB_FAIL(multi_tenant_.get_mtl_tenant_ids(tenant_ids))) {
|
if (OB_FAIL(multi_tenant_.get_mtl_tenant_ids(tenant_ids))) {
|
||||||
FLOG_ERROR("get mtl tenant ids fail", KR(ret));
|
FLOG_ERROR("get mtl tenant ids fail", KR(ret));
|
||||||
|
|||||||
@ -10,6 +10,7 @@
|
|||||||
* See the Mulan PubL v2 for more details.
|
* See the Mulan PubL v2 for more details.
|
||||||
*/
|
*/
|
||||||
#include "share/throttle/ob_throttle_common.h"
|
#include "share/throttle/ob_throttle_common.h"
|
||||||
|
#include "common/ob_clock_generator.h"
|
||||||
|
|
||||||
namespace oceanbase {
|
namespace oceanbase {
|
||||||
namespace share {
|
namespace share {
|
||||||
@ -32,7 +33,7 @@ int64_t ObThrottleStat::to_string(char *buf, const int64_t buf_len) const
|
|||||||
bool ObThrottleStat::need_log(const bool is_throttle_now)
|
bool ObThrottleStat::need_log(const bool is_throttle_now)
|
||||||
{
|
{
|
||||||
bool is_need_log = false;
|
bool is_need_log = false;
|
||||||
const int64_t cur_ts = common::ObTimeUtility::current_time();
|
const int64_t cur_ts = ObClockGenerator::getClock();
|
||||||
if (cur_ts - last_log_timestamp > MAX_LOG_INTERVAL) {
|
if (cur_ts - last_log_timestamp > MAX_LOG_INTERVAL) {
|
||||||
is_need_log = true;
|
is_need_log = true;
|
||||||
}
|
}
|
||||||
|
|||||||
@ -49,7 +49,6 @@ ObMemtableCtx::ObMemtableCtx()
|
|||||||
ref_(0),
|
ref_(0),
|
||||||
query_allocator_(),
|
query_allocator_(),
|
||||||
ctx_cb_allocator_(),
|
ctx_cb_allocator_(),
|
||||||
log_conflict_interval_(LOG_CONFLICT_INTERVAL),
|
|
||||||
ctx_(NULL),
|
ctx_(NULL),
|
||||||
truncate_cnt_(0),
|
truncate_cnt_(0),
|
||||||
lock_for_read_retry_count_(0),
|
lock_for_read_retry_count_(0),
|
||||||
@ -141,8 +140,6 @@ void ObMemtableCtx::reset()
|
|||||||
unsynced_cnt_ = 0;
|
unsynced_cnt_ = 0;
|
||||||
unsubmitted_cnt_ = 0;
|
unsubmitted_cnt_ = 0;
|
||||||
lock_mem_ctx_.reset();
|
lock_mem_ctx_.reset();
|
||||||
//FIXME: ctx_ is not reset
|
|
||||||
log_conflict_interval_.reset();
|
|
||||||
mtstat_.reset();
|
mtstat_.reset();
|
||||||
trans_mgr_.reset();
|
trans_mgr_.reset();
|
||||||
log_gen_.reset();
|
log_gen_.reset();
|
||||||
@ -303,7 +300,9 @@ void ObMemtableCtx::on_wlock_retry(const ObMemtableKey& key, const transaction::
|
|||||||
{
|
{
|
||||||
mtstat_.on_wlock_retry();
|
mtstat_.on_wlock_retry();
|
||||||
#define USING_LOG_PREFIX TRANS
|
#define USING_LOG_PREFIX TRANS
|
||||||
|
if (mtstat_.need_print()) {
|
||||||
FLOG_INFO("mvcc_write conflict", K(key), "tx_id", get_tx_id(), K(conflict_tx_id), KPC(this));
|
FLOG_INFO("mvcc_write conflict", K(key), "tx_id", get_tx_id(), K(conflict_tx_id), KPC(this));
|
||||||
|
}
|
||||||
#undef USING_LOG_PREFIX
|
#undef USING_LOG_PREFIX
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -313,7 +312,7 @@ void ObMemtableCtx::on_tsc_retry(const ObMemtableKey& key,
|
|||||||
const transaction::ObTransID &conflict_tx_id)
|
const transaction::ObTransID &conflict_tx_id)
|
||||||
{
|
{
|
||||||
mtstat_.on_tsc_retry();
|
mtstat_.on_tsc_retry();
|
||||||
if (log_conflict_interval_.reach()) {
|
if (mtstat_.need_print()) {
|
||||||
TRANS_LOG_RET(WARN, OB_SUCCESS, "transaction_set_consistency conflict", K(key), K(snapshot_version), K(max_trans_version), K(conflict_tx_id), KPC(this));
|
TRANS_LOG_RET(WARN, OB_SUCCESS, "transaction_set_consistency conflict", K(key), K(snapshot_version), K(max_trans_version), K(conflict_tx_id), KPC(this));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@ -56,6 +56,13 @@ public:
|
|||||||
void on_tsc_retry() { (void)ATOMIC_FAA(&tsc_retry_, 1); }
|
void on_tsc_retry() { (void)ATOMIC_FAA(&tsc_retry_, 1); }
|
||||||
int32_t get_wlock_retry_count() { return ATOMIC_LOAD(&wlock_retry_); }
|
int32_t get_wlock_retry_count() { return ATOMIC_LOAD(&wlock_retry_); }
|
||||||
int32_t get_tsc_retry_count() { return ATOMIC_LOAD(&tsc_retry_); }
|
int32_t get_tsc_retry_count() { return ATOMIC_LOAD(&tsc_retry_); }
|
||||||
|
bool need_print() const
|
||||||
|
{
|
||||||
|
return 1 == wlock_retry_
|
||||||
|
|| 1 == tsc_retry_
|
||||||
|
|| 0 == wlock_retry_ % 10
|
||||||
|
|| 0 == tsc_retry_ % 10;
|
||||||
|
}
|
||||||
private:
|
private:
|
||||||
int32_t wlock_retry_;
|
int32_t wlock_retry_;
|
||||||
int32_t tsc_retry_;
|
int32_t tsc_retry_;
|
||||||
@ -525,7 +532,6 @@ private:
|
|||||||
ObMemtableCtxCbAllocator ctx_cb_allocator_;
|
ObMemtableCtxCbAllocator ctx_cb_allocator_;
|
||||||
ObRedoLogGenerator log_gen_;
|
ObRedoLogGenerator log_gen_;
|
||||||
MemtableCtxStat mtstat_;
|
MemtableCtxStat mtstat_;
|
||||||
ObTimeInterval log_conflict_interval_;
|
|
||||||
transaction::ObPartTransCtx *ctx_;
|
transaction::ObPartTransCtx *ctx_;
|
||||||
int64_t truncate_cnt_;
|
int64_t truncate_cnt_;
|
||||||
// the retry count of lock for read
|
// the retry count of lock for read
|
||||||
|
|||||||
@ -444,7 +444,8 @@ void ObTenantWeakReadService::process_cluster_heartbeat_rpc_cb(
|
|||||||
ATOMIC_INC(&succ_cluster_heartbeat_count_);
|
ATOMIC_INC(&succ_cluster_heartbeat_count_);
|
||||||
ATOMIC_SET(&last_succ_cluster_heartbeat_tstamp_, cur_tstamp);
|
ATOMIC_SET(&last_succ_cluster_heartbeat_tstamp_, cur_tstamp);
|
||||||
} else {
|
} else {
|
||||||
LOG_WARN_RET(OB_ERR_UNEXPECTED, "tenant weak read service cluster heartbeat RPC fail", K(rcode), K(tenant_id_),
|
int ret = err_code;
|
||||||
|
LOG_WARN("tenant weak read service cluster heartbeat RPC fail", K(ret), K(rcode), K(tenant_id_),
|
||||||
K(dst), "cluster_service_tablet_id", cluster_service_.get_cluster_service_tablet_id());
|
K(dst), "cluster_service_tablet_id", cluster_service_.get_cluster_service_tablet_id());
|
||||||
// force refresh cluster service master
|
// force refresh cluster service master
|
||||||
refresh_cluster_service_master_();
|
refresh_cluster_service_master_();
|
||||||
@ -525,7 +526,7 @@ void ObTenantWeakReadService::print_stat_()
|
|||||||
K(min_cluster_version),
|
K(min_cluster_version),
|
||||||
K(max_cluster_version),
|
K(max_cluster_version),
|
||||||
K(get_cluster_version_err),
|
K(get_cluster_version_err),
|
||||||
"cluster_version_delta", cur_tstamp - cluster_version.convert_to_ts(ignore_invalid),
|
"cluster_version_delta", (in_cluster_service ? cur_tstamp - cluster_version.convert_to_ts(ignore_invalid) : -1),
|
||||||
K_(cluster_service_master),
|
K_(cluster_service_master),
|
||||||
"cluster_service_tablet_id", cluster_service_.get_cluster_service_tablet_id(),
|
"cluster_service_tablet_id", cluster_service_.get_cluster_service_tablet_id(),
|
||||||
K_(post_cluster_heartbeat_count),
|
K_(post_cluster_heartbeat_count),
|
||||||
@ -901,7 +902,7 @@ int ObTenantWeakReadService::post_cluster_heartbeat_rpc_(const SCN version,
|
|||||||
if (OB_ISNULL(wrs_rpc_)) {
|
if (OB_ISNULL(wrs_rpc_)) {
|
||||||
ret = OB_NOT_INIT;
|
ret = OB_NOT_INIT;
|
||||||
} else if (OB_FAIL(get_cluster_service_master_(cluster_service_master))) {
|
} else if (OB_FAIL(get_cluster_service_master_(cluster_service_master))) {
|
||||||
LOG_WARN("get cluster service master fail", KR(ret), K(tenant_id_),
|
LOG_TRACE("get cluster service master fail", KR(ret), K(tenant_id_),
|
||||||
"cluster_service_tablet_id", cluster_service_.get_cluster_service_tablet_id());
|
"cluster_service_tablet_id", cluster_service_.get_cluster_service_tablet_id());
|
||||||
} else if (OB_FAIL(wrs_rpc_->post_cluster_heartbeat(cluster_service_master, tenant_id_, req))) {
|
} else if (OB_FAIL(wrs_rpc_->post_cluster_heartbeat(cluster_service_master, tenant_id_, req))) {
|
||||||
LOG_WARN("post cluster heartbeat fail", KR(ret), K(cluster_service_master), K(tenant_id_),
|
LOG_WARN("post cluster heartbeat fail", KR(ret), K(cluster_service_master), K(tenant_id_),
|
||||||
|
|||||||
@ -162,6 +162,8 @@ int ObWeakReadService::check_tenant_can_start_service(const uint64_t tenant_id,
|
|||||||
// success
|
// success
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
|
// tenant not exist
|
||||||
|
can_start_service = true;
|
||||||
FLOG_WARN("change tenant context fail when get weak read service cluster version",
|
FLOG_WARN("change tenant context fail when get weak read service cluster version",
|
||||||
KR(ret), K(tenant_id));
|
KR(ret), K(tenant_id));
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user