Consider schema error as election priority
This commit is contained in:
@ -31,6 +31,7 @@ enum class FailureType
|
||||
RESOURCE_NOT_ENOUGH = 2,// 资源不足,如磁盘与内存,通常为环境原因所致
|
||||
PROCESS_HANG = 3,// 流程阻塞,发现某主要流程一直不结束或者不断重试却不能成功
|
||||
MAJORITY_FAILURE = 4,// 多数派异常,如副本的网络与多数派断连
|
||||
SCHEMA_NOT_REFRESHED = 5, // sql may failed when tenant schema not refreshed yet
|
||||
};
|
||||
|
||||
enum class FailureModule
|
||||
@ -40,6 +41,7 @@ enum class FailureModule
|
||||
LOG = 2,
|
||||
TRANSACTION = 3,
|
||||
STORAGE = 4,
|
||||
SCHEMA = 5,
|
||||
};
|
||||
|
||||
enum class FailureLevel
|
||||
@ -66,6 +68,9 @@ inline const char *obj_to_cstring(FailureType type)
|
||||
case FailureType::MAJORITY_FAILURE:
|
||||
ret = "MAJORITY FAILURE";
|
||||
break;
|
||||
case FailureType::SCHEMA_NOT_REFRESHED:
|
||||
ret = "SCHEMA NOT REFRESHED";
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -88,6 +93,9 @@ inline const char *obj_to_cstring(FailureModule module)
|
||||
case FailureModule::STORAGE:
|
||||
ret = "STORAGE";
|
||||
break;
|
||||
case FailureModule::SCHEMA:
|
||||
ret = "SCHEMA";
|
||||
break;
|
||||
default:
|
||||
break;
|
||||
}
|
||||
@ -127,6 +135,7 @@ public:
|
||||
module_(module),
|
||||
level_(level) {}
|
||||
FailureLevel get_failure_level() const { return level_; }
|
||||
FailureModule get_failure_module() const { return module_; }
|
||||
int set_info(const ObString &info) {
|
||||
return info_.assign(info);
|
||||
}
|
||||
|
@ -30,6 +30,7 @@
|
||||
#include "logservice/ob_log_service.h"
|
||||
#include "observer/ob_server_event_history_table_operator.h"
|
||||
#include "storage/slog/ob_storage_logger.h"
|
||||
#include "share/schema/ob_multi_version_schema_service.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -47,6 +48,7 @@ ObFailureDetector::ObFailureDetector()
|
||||
has_add_slog_hang_event_(false),
|
||||
has_add_sstable_hang_event_(false),
|
||||
has_add_clog_full_event_(false),
|
||||
has_schema_error_(false),
|
||||
lock_(common::ObLatchIds::ELECTION_LOCK)
|
||||
{
|
||||
COORDINATOR_LOG(INFO, "ObFailureDetector constructed");
|
||||
@ -125,6 +127,7 @@ void ObFailureDetector::destroy()
|
||||
has_add_slog_hang_event_ = false;
|
||||
has_add_sstable_hang_event_ = false;
|
||||
has_add_clog_full_event_ = false;
|
||||
has_schema_error_ = false;
|
||||
COORDINATOR_LOG(INFO, "ObFailureDetector mtl destroy");
|
||||
}
|
||||
|
||||
@ -165,6 +168,8 @@ void ObFailureDetector::detect_failure()
|
||||
detect_sstable_io_failure_();
|
||||
// clog disk full check
|
||||
detect_palf_disk_full_();
|
||||
// schema refreshed check
|
||||
detect_schema_not_refreshed_();
|
||||
}
|
||||
|
||||
int ObFailureDetector::add_failure_event(const FailureEvent &event)
|
||||
@ -320,6 +325,11 @@ bool ObFailureDetector::is_data_disk_has_fatal_error(bool &slog_hang, bool &data
|
||||
|| ATOMIC_LOAD(&has_add_sstable_hang_event_);
|
||||
}
|
||||
|
||||
bool ObFailureDetector::is_schema_not_refreshed()
|
||||
{
|
||||
return ATOMIC_LOAD(&has_schema_error_);
|
||||
}
|
||||
|
||||
void ObFailureDetector::detect_palf_hang_failure_()
|
||||
{
|
||||
LC_TIME_GUARD(1_s);
|
||||
@ -463,6 +473,37 @@ void ObFailureDetector::detect_palf_disk_full_()
|
||||
}
|
||||
}
|
||||
|
||||
void ObFailureDetector::detect_schema_not_refreshed_()
|
||||
{
|
||||
LC_TIME_GUARD(1_s);
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t now = ObTimeUtility::current_time();
|
||||
bool schema_not_refreshed = GSCHEMASERVICE.is_tenant_not_refreshed(MTL_ID());
|
||||
FailureEvent schema_not_refreshed_event(FailureType::SCHEMA_NOT_REFRESHED, FailureModule::SCHEMA, FailureLevel::SERIOUS);
|
||||
if (OB_FAIL(schema_not_refreshed_event.set_info("schema not refreshed"))) {
|
||||
COORDINATOR_LOG(ERROR, "schema_not_refreshed_event set_info failed", KR(ret));
|
||||
} else if (false == ATOMIC_LOAD(&has_schema_error_)) {
|
||||
if (!schema_not_refreshed) {
|
||||
// schema has been refreshed, skip.
|
||||
} else if (OB_FAIL(add_failure_event(schema_not_refreshed_event))) {
|
||||
COORDINATOR_LOG(ERROR, "add_failure_event failed", KR(ret), K(schema_not_refreshed));
|
||||
} else {
|
||||
ATOMIC_SET(&has_schema_error_, true);
|
||||
COORDINATOR_LOG(WARN, "schema not refreshed, add failure event",
|
||||
K(schema_not_refreshed), K(now));
|
||||
}
|
||||
} else {
|
||||
if (schema_not_refreshed) {
|
||||
// schema is still not refreshed, cannot remove failure_event.
|
||||
} else if (OB_FAIL(remove_failure_event(schema_not_refreshed_event))) {
|
||||
COORDINATOR_LOG(ERROR, "remove_failure_event failed", KR(ret), K(schema_not_refreshed));
|
||||
} else {
|
||||
ATOMIC_SET(&has_schema_error_, false);
|
||||
COORDINATOR_LOG(INFO, "schema is refreshed, remove failure event", KR(ret), K(schema_not_refreshed));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
int ObFailureDetector::FailureEventWithRecoverOp::init(const FailureEvent &event,
|
||||
const ObFunction<bool()> &recover_detect_operation)
|
||||
{
|
||||
|
@ -93,6 +93,7 @@ public:
|
||||
void detect_failure();
|
||||
bool is_clog_disk_has_fatal_error();
|
||||
bool is_data_disk_has_fatal_error(bool &slog_hang, bool &data_hang);
|
||||
bool is_schema_not_refreshed();
|
||||
private:
|
||||
bool check_is_running_() const { return is_running_; }
|
||||
int insert_event_to_table_(const FailureEvent &event, const ObFunction<bool()> &recover_operation, ObString info);
|
||||
@ -100,6 +101,7 @@ private:
|
||||
void detect_slog_writer_hang_failure_();
|
||||
void detect_sstable_io_failure_();
|
||||
void detect_palf_disk_full_();
|
||||
void detect_schema_not_refreshed_();
|
||||
private:
|
||||
struct FailureEventWithRecoverOp {
|
||||
int init(const FailureEvent &event, const ObFunction<bool()> &recover_detect_operation);
|
||||
@ -118,6 +120,7 @@ private:
|
||||
bool has_add_slog_hang_event_;
|
||||
bool has_add_sstable_hang_event_;
|
||||
bool has_add_clog_full_event_;
|
||||
bool has_schema_error_;
|
||||
ObSpinLock lock_;
|
||||
};
|
||||
|
||||
|
@ -3538,6 +3538,39 @@ bool ObMultiVersionSchemaService::is_tenant_full_schema(const uint64_t tenant_id
|
||||
return bret;
|
||||
}
|
||||
|
||||
// factor of election priority
|
||||
bool ObMultiVersionSchemaService::is_tenant_not_refreshed(const uint64_t tenant_id)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
bool schema_not_refreshed = false;
|
||||
if (OB_FAIL(refresh_full_schema_map_.get_refactored(tenant_id, schema_not_refreshed))) {
|
||||
// 1. tenant not exist
|
||||
// 2. tenant schema not refreshed yet after create tenant or restart observer.
|
||||
schema_not_refreshed = true;
|
||||
LOG_TRACE("fail to get refresh full schema flag from map", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
// 1. when schema_not_refreshed = false, it means tenant refreshed full schema once.
|
||||
// 2. when schema_not_refreshed = true, it means tenant schema should be refreshed or tenant has been dropped.
|
||||
if (schema_not_refreshed) {
|
||||
ObSchemaGetterGuard guard;
|
||||
ObSimpleTenantSchema *tenant_schema = NULL;
|
||||
if (OB_FAIL(get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
|
||||
schema_not_refreshed = false;
|
||||
LOG_WARN("fail to get schema guard", KR(ret), K(tenant_id));
|
||||
} else if (OB_ISNULL(tenant_schema)) {
|
||||
schema_not_refreshed = true;
|
||||
LOG_TRACE("tenant should be refreshed or has been dropped", KR(ret), K(tenant_id));
|
||||
} else if (tenant_schema->is_normal()) {
|
||||
schema_not_refreshed = true;
|
||||
} else {
|
||||
// To make ls leader stable when tenant is in abnormal status.
|
||||
schema_not_refreshed = false;
|
||||
}
|
||||
}
|
||||
}
|
||||
return schema_not_refreshed;
|
||||
}
|
||||
|
||||
// sql should retry when tenant is normal but never refresh schema successfully.
|
||||
bool ObMultiVersionSchemaService::is_schema_error_need_retry(
|
||||
ObSchemaGetterGuard *guard,
|
||||
|
@ -236,6 +236,8 @@ public:
|
||||
|
||||
bool is_tenant_full_schema(const uint64_t tenant_id) const;
|
||||
|
||||
bool is_tenant_not_refreshed(const uint64_t tenant_id);
|
||||
|
||||
// sql should retry when tenant is normal but never refresh schema successfully.
|
||||
bool is_schema_error_need_retry(
|
||||
ObSchemaGetterGuard *guard,
|
||||
|
Reference in New Issue
Block a user