Adjust weak read timestamp validation

This commit is contained in:
obdev 2023-05-16 08:47:35 +00:00 committed by ob-robot
parent 51b2ce63be
commit 2b7b728148
12 changed files with 87 additions and 62 deletions

View File

@ -2911,7 +2911,7 @@
"ob_max_read_stale_time": {
"id": 10137,
"name": "ob_max_read_stale_time",
"value": "5000000",
"value": "-1",
"data_type": "int",
"info": "max stale time(us) for weak read query ",
"flags": "GLOBAL | SESSION | NEED_SERIALIZE",

View File

@ -1183,11 +1183,12 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
can_read = false;
if (!ls_id.is_valid()
|| !addr.is_valid()
|| max_stale_time_us <= 0) {
|| !addr.is_valid()) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ls_id), K(addr), K(max_stale_time_us));
} else if (-1 == max_stale_time_us) {
// no need check
can_read = true;
} else if (observer::ObServer::get_instance().get_self() == addr) {
storage::ObLSService *ls_svr = MTL(storage::ObLSService *);
storage::ObLSHandle handle;
@ -1201,16 +1202,12 @@ int ObSqlTransControl::check_ls_readable(const uint64_t tenant_id,
} else if (OB_ISNULL(ls = handle.get_ls())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("id service log stream not exist");
} else {
can_read = true;
}
/*
} else if (ObTimeUtility::current_time() - max_stale_time_us
< ls->get_ls_wrs_handler()->get_ls_weak_read_ts().convert_to_ts()) {
can_read = true;
} else if (REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
LOG_WARN("log stream unreadable", K(ls_id), K(addr), K(max_stale_time_us));
}*/
}
} else {
LOG_TRACE("log stream is not local", K(ls_id), K(addr));
}

View File

@ -715,7 +715,6 @@ ERRSIM_POINT_DEF(ERRSIM_WEAK_READ_SNAPSHOT_DELAY_US);
int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_time,
SCN &snapshot)
{
UNUSED(max_read_stale_time);
int ret = OB_SUCCESS;
bool monotinic_read = true;;
// server weak read version
@ -727,12 +726,12 @@ int ObTransService::get_weak_read_snapshot_version(const int64_t max_read_stale_
// wrs cluster version
} else if (OB_FAIL(GCTX.weak_read_service_->get_cluster_version(tenant_id_, snapshot))) {
TRANS_LOG(WARN, "get weak read snapshot fail", K(ret), KPC(this));
} else {
} else if (-1 == max_read_stale_time) {
// no need to check barrier version
// do nothing
}
if (OB_SUCC(ret)) {
int64_t max_stale_time = ObWeakReadUtil::max_stale_time_for_weak_consistency(tenant_id_, 0);
const int64_t snapshot_barrier = ObTimeUtility::current_time() - max_stale_time
} else {
// check snapshot version barrier which is setted by user system variable
const int64_t snapshot_barrier = ObTimeUtility::current_time() - max_read_stale_time
+ abs(ERRSIM_WEAK_READ_SNAPSHOT_DELAY_US);
if (snapshot.convert_to_ts() < snapshot_barrier) {
TRANS_LOG(WARN, "weak read snapshot too stale", K(snapshot),

View File

@ -15,6 +15,7 @@
#include "observer/ob_server_struct.h" // for GCTX
#include "deps/oblib/src/common/ob_role.h" // role
#include "src/storage/tx/wrs/ob_weak_read_util.h" // ObWeakReadUtil
#include "src/storage/tx/ob_ts_mgr.h"
namespace oceanbase
{
@ -235,26 +236,28 @@ int ObBLService::do_black_list_check_(sqlclient::ObMySQLResult *result)
{
int ret = OB_SUCCESS;
int64_t max_stale_time = 0;
int64_t curr_time_ns = ObTimeUtility::current_time_ns();
while (OB_SUCC(result->next())) {
ObBLKey bl_key;
ObLsInfo ls_info;
SCN gts_scn;
if (OB_FAIL(get_info_from_result_(*result, bl_key, ls_info))) {
TRANS_LOG(WARN, "get_info_from_result_ fail ", KR(ret), K(result));
} else if (LEADER == ls_info.ls_state_) {
// cannot add leader into blacklist
} else if (ls_info.weak_read_scn_ == 0 && ls_info.migrate_status_ == OB_MIGRATE_STATUS_NONE) {
// log stream is initializing, should't be put into blacklist
} else if (OB_FAIL(OB_TS_MGR.get_gts(bl_key.get_tenant_id(), NULL, gts_scn))) {
TRANS_LOG(WARN, "get gts scn error", K(ret), K(bl_key));
} else {
max_stale_time = get_tenant_max_stale_time_(bl_key.get_tenant_id());
int64_t max_stale_time_ns = max_stale_time * 1000;
if (curr_time_ns > ls_info.weak_read_scn_ + max_stale_time_ns) {
if (gts_scn.get_val_for_gts() > ls_info.weak_read_scn_ + max_stale_time_ns) {
// scn is out-of-time,add this log stream into blacklist
if (OB_FAIL(ls_bl_mgr_.update(bl_key, ls_info))) {
TRANS_LOG(WARN, "ls_bl_mgr_ add fail ", K(bl_key), K(ls_info));
}
} else if (curr_time_ns + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) {
} else if (gts_scn.get_val_for_gts() + BLACK_LIST_WHITEWASH_INTERVAL_NS < ls_info.weak_read_scn_ + max_stale_time_ns) {
// scn is new enough,remove this log stream in the blacklist
ls_bl_mgr_.remove(bl_key);
} else {

View File

@ -81,6 +81,7 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls,
{
int ret = OB_SUCCESS;
SCN timestamp;
SCN gts_scn;
const ObLSID &ls_id = ls.get_ls_id();
need_skip = false;
ObMigrationStatus status = ObMigrationStatus::OB_MIGRATION_STATUS_NONE;
@ -99,10 +100,12 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls,
STORAGE_LOG(DEBUG, "fail to generate weak read timestamp", KR(ret), K(max_stale_time));
need_skip = true;
ret = OB_SUCCESS;
} else if (OB_TS_MGR.get_gts(MTL_ID(), NULL, gts_scn)) {
TRANS_LOG(WARN, "get gts scn error", K(ls_id), K(max_stale_time));
} else if (OB_FAIL(ls.get_migration_status(status))
|| ObMigrationStatus::OB_MIGRATION_STATUS_NONE == status ) {
// check the weak read timestamp of the migrated ls
if (timestamp.get_val_for_logservice() > ObTimeUtility::current_time() - 500 * 1000) {
if (timestamp.convert_to_ts() > gts_scn.convert_to_ts() - 500 * 1000) {
STORAGE_LOG(TRACE, "ls received the latest log", K(timestamp));
// clog chases within 500ms, then clear the mark
need_skip = false;
@ -110,8 +113,8 @@ int ObLSWRSHandler::generate_ls_weak_read_snapshot_version(ObLS &ls,
need_skip = true;
}
} else {
int64_t snapshot_version_barrier = ObTimeUtility::current_time() - max_stale_time;
if (timestamp.get_val_for_logservice() <= snapshot_version_barrier) {
int64_t snapshot_version_barrier = gts_scn.convert_to_ts() - max_stale_time;
if (timestamp.convert_to_ts() <= snapshot_version_barrier) {
// rule out these ls to avoid too old weak read timestamp
need_skip = true;
} else {
@ -171,8 +174,9 @@ int ObLSWRSHandler::generate_weak_read_timestamp_(ObLS &ls, const int64_t max_st
|| REACH_TIME_INTERVAL(10 * 1000 * 1000)) {
TRANS_LOG(INFO, "get wrs ts", K(ls_id),
"delta", current_us - timestamp.convert_to_ts(),
K(timestamp),
K(min_tx_service_ts));
"log_service_ts", min_log_service_scn.convert_to_ts(),
"min_tx_service_ts", min_tx_service_ts.convert_to_ts(),
K(timestamp));
// print keep alive info
ls.get_keep_alive_ls_handler()->print_stat_info();
}

View File

@ -20,6 +20,7 @@
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/tx_storage/ob_ls_map.h"
#include "storage/tx_storage/ob_ls_handle.h"
#include "storage/tx/ob_ts_mgr.h"
#include "logservice/ob_log_service.h"
#include "ob_tenant_weak_read_cluster_service.h"
@ -340,8 +341,7 @@ int ObTenantWeakReadClusterService::start_service()
int64_t leader_epoch = 0;
int64_t begin_ts = ObTimeUtility::current_time();
int64_t after_lock_time = 0, begin_query_ts = 0, begin_persist_ts = 0, end_ts = 0;
const int64_t max_stale_time =
ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID()) * 1000;
const int64_t max_stale_time = ObWeakReadUtil::max_stale_time_for_weak_consistency(MTL_ID());
const int64_t tenant_id = MTL_ID();
ISTAT("begin start service", K(tenant_id), K(is_in_service()), K_(can_update_version));
@ -368,7 +368,7 @@ int ObTenantWeakReadClusterService::start_service()
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("WRS leader epoch is invalid", KR(ret), K(leader_epoch));
} else {
SCN cur_min_version, cur_max_version;
SCN cur_min_version, cur_max_version, gts_scn;
bool record_exist = false;
begin_query_ts = ObTimeUtility::current_time();
@ -376,13 +376,15 @@ int ObTenantWeakReadClusterService::start_service()
// query cluster weak read version range in WRS table
if (OB_FAIL(query_cluster_version_range_(cur_min_version, cur_max_version, record_exist))) {
LOG_WARN("query cluster version range from WRS table fail", KR(ret));
} else if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id, NULL, gts_scn))) {
LOG_WARN("get gts error", K(ret), K(cur_min_version), K(cur_max_version));
} else {
begin_persist_ts = ObTimeUtility::current_time();
// new weak read version delay should smaller than max_stale_time
int64_t new_version = MAX(cur_max_version.get_val_for_gts(), ObTimeUtility::current_time_ns() - max_stale_time);
int64_t new_version = MAX(cur_max_version.convert_to_ts(), gts_scn.convert_to_ts() - max_stale_time);
SCN new_version_scn;
if (OB_FAIL(new_version_scn.convert_for_gts(new_version))) {
if (OB_FAIL(new_version_scn.convert_from_ts(new_version))) {
LOG_ERROR("convert for gts fail", KR(ret));
} else {
SCN new_min_version = new_version_scn;
@ -468,13 +470,13 @@ void ObTenantWeakReadClusterService::get_serve_info(bool &in_service, int64_t &l
leader_epoch = leader_epoch_;
}
int ObTenantWeakReadClusterService::get_version(SCN &version) const
int ObTenantWeakReadClusterService::get_cluster_version(SCN &version) const
{
SCN min_version, max_version;
return get_version(version, min_version, max_version);
return get_cluster_version(version, min_version, max_version);
}
int ObTenantWeakReadClusterService::get_version(SCN &version, SCN &min_version,
int ObTenantWeakReadClusterService::get_cluster_version(SCN &version, SCN &min_version,
SCN &max_version) const
{
int ret = OB_SUCCESS;
@ -612,7 +614,7 @@ bool ObTenantWeakReadClusterService::check_can_update_version_()
return can_update_version_;
}
int ObTenantWeakReadClusterService::update_version(int64_t &affected_rows)
int ObTenantWeakReadClusterService::update_cluster_version(int64_t &affected_rows)
{
int ret = OB_SUCCESS;
SCN new_version;
@ -640,8 +642,8 @@ int ObTenantWeakReadClusterService::update_version(int64_t &affected_rows)
"tenant_id", MTL_ID(), K(cur_leader_epoch), K(leader_epoch_));
} else if (! check_can_update_version_()) {
TRANS_LOG(TRACE, "can not update version");
} else if (FALSE_IT(new_version = compute_version_(skipped_server_count, need_print))) {
// nothing to do
} else if (OB_FAIL(compute_cluster_version_(skipped_server_count, need_print, new_version))) {
TRANS_LOG(WARN, "compute version error", K(ret), K(skipped_server_count), K(need_print));
} else if (OB_UNLIKELY(!new_version.is_valid())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid CLUSTER weak read version", K(new_version), KR(ret));
@ -682,13 +684,19 @@ int ObTenantWeakReadClusterService::update_version(int64_t &affected_rows)
return ret;
}
SCN ObTenantWeakReadClusterService::compute_version_(int64_t &skipped_servers, bool need_print) const
int ObTenantWeakReadClusterService::compute_cluster_version_(int64_t &skipped_servers, bool need_print, SCN &version) const
{
int ret = OB_SUCCESS;
/// min weak read version delay should not smaller than max_stale_time
SCN base_version = ObWeakReadUtil::generate_min_weak_read_version(MTL_ID());
// weak read version should increase monotonically
base_version = SCN::max(current_version_, base_version);
return cluster_version_mgr_.get_version(base_version, skipped_servers, need_print);
SCN base_version;
if (OB_FAIL(ObWeakReadUtil::generate_min_weak_read_version(MTL_ID(), base_version))) {
TRANS_LOG(WARN, "generate min weak read version error", K(ret), K(skipped_servers), K(need_print));
} else {
// weak read version should increase monotonically
base_version = SCN::max(current_version_, base_version);
version = cluster_version_mgr_.get_cluster_version(base_version, skipped_servers, need_print);
}
return ret;
}
bool ObTenantWeakReadClusterService::is_service_master() const

View File

@ -80,8 +80,8 @@ public:
/// @retval OB_NOT_MASTER self is in service, but not wrs leader, should stop service
/// @retval OB_NEED_RETRY wrs not ready,need retry
/// @retval OTHER CODE fail
int get_version(share::SCN &version) const;
int get_version(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const;
int get_cluster_version(share::SCN &version) const;
int get_cluster_version(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const;
/// update wrs version
///
@ -90,7 +90,7 @@ public:
/// @retval OB_NOT_MASTER self is in service, but not wrs leader, should stop service
/// @retval OB_NEED_RETRY need retry
/// @retval OTHER CODE fail
int update_version(int64_t &affected_rows);
int update_cluster_version(int64_t &affected_rows);
bool need_print_skipped_server();
@ -142,8 +142,8 @@ private:
return share::SCN::plus(min_version, WRS_VERSION_GAP_FOR_PERSISTENCE * 1000);
}
void stop_service_impl_();
share::SCN compute_version_(int64_t &skipped_servers, bool need_print) const;
int get_version_(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const;
int compute_cluster_version_(int64_t &skipped_servers, bool need_print, share::SCN &new_version) const;
int get_cluster_version_(share::SCN &version, share::SCN &min_version, share::SCN &max_version) const;
bool need_force_change_leader_();
int force_change_leader_() const;
int verify_candidate_server_(const common::ObAddr &server) const;

View File

@ -121,7 +121,7 @@ int ObTenantWeakReadClusterVersionMgr::update_server_version(const common::ObAdd
return ret;
}
SCN ObTenantWeakReadClusterVersionMgr::get_version(const SCN base_version,
SCN ObTenantWeakReadClusterVersionMgr::get_cluster_version(const SCN base_version,
int64_t &skip_server_count,
const bool force_print) const
{
@ -150,7 +150,7 @@ SCN ObTenantWeakReadClusterVersionMgr::get_version(const SCN base_version,
min_version = version;
}
}
LOG_DEBUG("[WEAK_READ_SERVER_VERSION_MGR] get version from server", K_(tenant_id), K(i),
LOG_DEBUG("[WEAK_READ_SERVER_VERSION_MGR] get cluster version from server", K_(tenant_id), K(i),
K(need_skip), K(base_version), K(min_version), "server_verion", svr_array_.at(i));
}

View File

@ -56,7 +56,7 @@ public:
/// get min server version which not smaller than base_version
/// if no statisfied server version, return base_version
share::SCN get_version(const share::SCN base_version, int64_t &skip_server_count, const bool need_print_server_info) const;
share::SCN get_cluster_version(const share::SCN base_version, int64_t &skip_server_count, const bool need_print_server_info) const;
// get server count in cluster master cached registered servers
int64_t get_server_count() const;

View File

@ -207,7 +207,7 @@ int ObTenantWeakReadService::get_cluster_version_internal_(SCN &version,
int ret = OB_SUCCESS;
if (OB_UNLIKELY(! inited_)) {
ret = OB_NOT_INIT;
} else if (OB_FAIL(cluster_service_.get_version(version))) {
} else if (OB_FAIL(cluster_service_.get_cluster_version(version))) {
LOG_WARN("get weak read cluster version fail", KR(ret), K(version), K(tenant_id_));
if (OB_NEED_RETRY == ret) {
// self may be WRS Leader while not ready, need retry
@ -306,11 +306,19 @@ int ObTenantWeakReadService::update_server_version_with_part_info(const int64_t
}
int ObTenantWeakReadService::generate_server_version(const int64_t epoch_tstamp,
const bool need_print_status)
const bool need_print_status)
{
SCN base_version_when_no_valid_partition = ObWeakReadUtil::generate_min_weak_read_version(tenant_id_);
return svr_version_mgr_.generate_new_version(tenant_id_, epoch_tstamp,
base_version_when_no_valid_partition, need_print_status);
int ret = OB_SUCCESS;
SCN base_version_when_no_valid_partition;
if (OB_FAIL(ObWeakReadUtil::generate_min_weak_read_version(tenant_id_, base_version_when_no_valid_partition))) {
TRANS_LOG(WARN, "generate min weak read version error", K(ret), K_(tenant_id));
} else {
ret = svr_version_mgr_.generate_new_version(tenant_id_,
epoch_tstamp,
base_version_when_no_valid_partition,
need_print_status);
}
return ret;
}
void ObTenantWeakReadService::get_weak_read_stat(ObTenantWeakReadStat &wrs_stat) const
@ -352,7 +360,7 @@ void ObTenantWeakReadService::get_weak_read_stat(ObTenantWeakReadStat &wrs_stat)
wrs_stat.self_ = self_;
// cluster info
cluster_service_.get_version(current_cluster_version, min_cluster_version, max_cluster_version);
cluster_service_.get_cluster_version(current_cluster_version, min_cluster_version, max_cluster_version);
wrs_stat.cluster_version_ = current_cluster_version;
wrs_stat.cluster_version_delta_ = in_service? (cur_tstamp - wrs_stat.cluster_version_.convert_to_ts(ignore_invalid)):0;
wrs_stat.min_cluster_version_ = min_cluster_version;
@ -504,7 +512,7 @@ void ObTenantWeakReadService::print_stat_()
svr_version_mgr_.get_version(sv);
if (in_cluster_service) {
get_cluster_version_err = cluster_service_.get_version(cluster_version, min_cluster_version,
get_cluster_version_err = cluster_service_.get_cluster_version(cluster_version, min_cluster_version,
max_cluster_version);
}
@ -871,7 +879,7 @@ void ObTenantWeakReadService::generate_cluster_version_()
{
int ret = OB_SUCCESS;
int64_t affected_rows = 0;
if (OB_FAIL(cluster_service_.update_version(affected_rows))) {
if (OB_FAIL(cluster_service_.update_cluster_version(affected_rows))) {
bool need_stop_service = false;
bool need_self_check = need_force_self_check_(ret, affected_rows, need_stop_service);
if (need_self_check) {

View File

@ -15,6 +15,7 @@
#include <algorithm>
#include <stdarg.h>
#include <stdint.h>
#include "storage/tx/ob_ts_mgr.h"
namespace oceanbase
{
using namespace common;
@ -44,9 +45,9 @@ int64_t ObWeakReadUtil::replica_keepalive_interval()
// 2. all partitions offline
// 3. all partitions delay too much or in invalid status
// 4. all partitions in migrating and readable snapshot version delay more than 500ms
SCN ObWeakReadUtil::generate_min_weak_read_version(const uint64_t tenant_id)
int ObWeakReadUtil::generate_min_weak_read_version(const uint64_t tenant_id, SCN &scn)
{
SCN base_version_when_no_valid_partition;
int ret = OB_SUCCESS;
int64_t max_stale_time = 0;
bool tenant_config_exist = false;
// generating min weak version version should statisfy following constraint
@ -69,15 +70,20 @@ SCN ObWeakReadUtil::generate_min_weak_read_version(const uint64_t tenant_id)
);
max_stale_time = std::max(max_stale_time, static_cast<int64_t>(DEFAULT_REPLICA_KEEPALIVE_INTERVAL));
// the unit of max_stale_time is us,we should change to ns
base_version_when_no_valid_partition.convert_from_ts(ObTimeUtility::current_time() - max_stale_time);
SCN tmp_scn;
if (OB_FAIL(OB_TS_MGR.get_gts(tenant_id, NULL, tmp_scn))) {
TRANS_LOG(WARN, "get gts cache error", K(ret), K(tenant_id));
} else {
// the unit of max_stale_time is us,we should change to ns
scn.convert_from_ts(tmp_scn.convert_to_ts() - max_stale_time);
}
if ((!tenant_config_exist) && REACH_TIME_INTERVAL(1 * 1000 * 1000L)) {
TRANS_LOG_RET(WARN, OB_ERR_UNEXPECTED, "tenant not exist when generate min weak read version, use default max stale time instead",
K(tenant_id), K(base_version_when_no_valid_partition), K(lbt()));
K(tenant_id), K(tmp_scn), K(lbt()));
}
return base_version_when_no_valid_partition;
return ret;
}
bool ObWeakReadUtil::enable_monotonic_weak_read(const uint64_t tenant_id)

View File

@ -31,7 +31,7 @@ public:
static const int64_t DEFAULT_REPLICA_KEEPALIVE_INTERVAL = 3000 * 1000L;
static const int64_t IGNORE_TENANT_EXIST_WARN = 1;
static int64_t replica_keepalive_interval();
static share::SCN generate_min_weak_read_version(const uint64_t tenant_id);
static int generate_min_weak_read_version(const uint64_t tenant_id, share::SCN &scn);
static bool enable_monotonic_weak_read(const uint64_t tenant_id);
static int64_t max_stale_time_for_weak_consistency(const uint64_t tenant_id, int64_t ignore_warn = 0);
static bool check_weak_read_service_available();