Modify schema related parameters

This commit is contained in:
tino247
2023-09-09 10:59:21 +00:00
committed by ob-robot
parent cade45ca80
commit 930fc3844a
7 changed files with 48 additions and 37 deletions

View File

@ -1207,8 +1207,8 @@ DEF_STR_WITH_CHECKER(_audit_mode, OB_TENANT_PARAMETER, "NONE",
"MYSQL: use mysql audit"
"ORACLE: use oracle audit",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT(_max_schema_slot_num, OB_CLUSTER_PARAMETER, "128", "[2,256]",
"the max schema slot number for each tenant, "
DEF_INT(_max_schema_slot_num, OB_TENANT_PARAMETER, "128", "[2,256]",
"the max schema slot number for multi-version schema memory management, "
"Range: [2, 256] in integer",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_INT_WITH_CHECKER(_ob_query_rate_limit, OB_TENANT_PARAMETER, "-1",
@ -1557,8 +1557,8 @@ DEF_BOOL(_xsolapi_generate_with_clause, OB_TENANT_PARAMETER, "True",
DEF_BOOL(_optimizer_group_by_placement, OB_TENANT_PARAMETER, "True",
"enable group by placement transform rule",
ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_TIME(_wait_interval_after_truncate, OB_CLUSTER_PARAMETER, "30s", "[0s,)",
"time interval for waiting other servers to refresh schema after truncate",
DEF_TIME(_wait_interval_after_parallel_ddl, OB_TENANT_PARAMETER, "30s", "[0s,)",
"time interval for waiting other servers to refresh schema after parallel ddl is done",
ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE));
DEF_CAP(_rebuild_replica_log_lag_threshold, OB_TENANT_PARAMETER, "0M", "[0M,)",
"size of clog files that a replica lag behind leader to trigger rebuild, 0 means never trigger rebuild on purpose. Range: [0, +∞)",

View File

@ -2207,7 +2207,12 @@ int ObMultiVersionSchemaService::add_schema(
// try switch allocator
if (OB_SUCC(ret)) {
bool can_switch = false;
int64_t switch_cnt = ObSchemaService::g_liboblog_mode_ ? init_version_cnt_ : GCONF._max_schema_slot_num;
int64_t max_schema_slot_num = OB_MAX_VERSION_COUNT;
omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id));
if (tenant_config.is_valid()) {
max_schema_slot_num = tenant_config->_max_schema_slot_num;
}
const int64_t switch_cnt = ObSchemaService::g_liboblog_mode_ ? init_version_cnt_ : max_schema_slot_num;
if (OB_FAIL(mem_mgr->check_can_switch_allocator(switch_cnt, can_switch))) {
LOG_WARN("fail to check can switch allocator", KR(ret));
} else if (can_switch) {
@ -2384,8 +2389,9 @@ int ObMultiVersionSchemaService::async_refresh_schema(
// do nothing
} else {
int64_t retry_cnt = 0;
int64_t MAX_RETRY_CNT = 1000;
const __useconds_t RETRY_IDLE_TIME = 100 * 1000L; // 100ms
const __useconds_t RETRY_IDLE_TIME = 10 * 1000L; // 10ms
const int64_t MAX_RETRY_CNT = 100 * 1000 * 1000L / RETRY_IDLE_TIME; // 100s at most
const int64_t SUBMIT_TASK_FREQUENCE = 2 * 1000 * 1000L / RETRY_IDLE_TIME; // each 2s
while (OB_SUCC(ret)) {
if (THIS_WORKER.is_timeout()
|| (INT64_MAX == THIS_WORKER.get_timeout_ts() && retry_cnt >= MAX_RETRY_CNT)) {
@ -2400,21 +2406,13 @@ int ObMultiVersionSchemaService::async_refresh_schema(
// success
break;
} else {
if (0 == retry_cnt % 20) {
// try refresh schema each 2s
{
if (0 == retry_cnt % SUBMIT_TASK_FREQUENCE) {
bool is_dropped = false;
ObSchemaGetterGuard guard;
if (OB_FAIL(get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) {
LOG_WARN("fail to get schema guard", KR(ret));
} else if (OB_FAIL(guard.check_if_tenant_has_been_dropped(tenant_id, is_dropped))) {
if (OB_FAIL(check_if_tenant_has_been_dropped(tenant_id, is_dropped))) {
LOG_WARN("fail to check if tenant has been dropped", KR(ret), K(tenant_id));
} else if (is_dropped) {
ret = OB_TENANT_HAS_BEEN_DROPPED;
LOG_WARN("tenant has been dropped", KR(ret), K(tenant_id));
}
}
if (OB_FAIL(ret)) {
} else if (OB_ISNULL(GCTX.ob_service_)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("observice is null", K(ret));

View File

@ -500,15 +500,24 @@ int ObSchemaMgrCache::put(ObSchemaMgr *schema_mgr,
LOG_INFO("put schema mgr",
"schema version", NULL != schema_mgr ?
schema_mgr->get_schema_version() : OB_INVALID_VERSION);
if (!check_inner_stat()) {
if (OB_UNLIKELY(!check_inner_stat())) {
ret = OB_INNER_STAT_ERROR;
LOG_WARN("inner stat error", K(ret));
} else if (NULL == schema_mgr) {
LOG_WARN("inner stat error", KR(ret));
} else if (OB_ISNULL(schema_mgr)) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(ret), K(schema_mgr));
LOG_WARN("invalid argument", KR(ret), KP(schema_mgr));
} else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == schema_mgr->get_tenant_id())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_id", KR(ret), "tenant_id", schema_mgr->get_tenant_id());
} else {
ObSchemaMgrItem *dst_item = NULL;
bool is_stop = false;
const uint64_t tenant_id = schema_mgr->get_tenant_id();
int64_t max_schema_slot_num = max_cached_num_;
omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id));
if (tenant_config.is_valid()) {
max_schema_slot_num = tenant_config->_max_schema_slot_num;
}
TCWLockGuard guard(lock_);
// 1. In order to avoid the repeated adjustment of the configuration item _max_schema_slot_num that may cause problems
// that may be caused by the invisible version in the history, max_cached_num_ can only be increased during
@ -520,7 +529,7 @@ int ObSchemaMgrCache::put(ObSchemaMgr *schema_mgr,
// and the schema_mgr memory management strategy is different from the schema refresh scenario.
// In order to reduce unnecessary memory usage, a fixed number of 16 slots is also used.
if (!ObSchemaService::g_liboblog_mode_ && FALLBACK != mode_) {
max_cached_num_ = max(max_cached_num_, GCONF._max_schema_slot_num);
max_cached_num_ = max(max_cached_num_, max_schema_slot_num);
}
int64_t target_pos = -1;
for (int64_t i = 0; i < max_cached_num_ && !is_stop; ++i) {

View File

@ -492,15 +492,20 @@ int ObSchemaUtils::construct_inner_table_schemas(
}
return ret;
}
int ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(
const ObTimeoutCtx &ctx,
const uint64_t tenant_id,
const int64_t schema_version,
const int64_t consensus_timeout)
const int64_t schema_version)
{
int ret = OB_SUCCESS;
int64_t start_time = ObTimeUtility::current_time();
ObMultiVersionSchemaService *schema_service = NULL;
int64_t consensus_timeout = 30 * 1000 * 1000L; // 30s
omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id));
if (tenant_config.is_valid()) {
consensus_timeout = tenant_config->_wait_interval_after_parallel_ddl;
}
if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id
|| schema_version <= 0
|| consensus_timeout < 0)) {
@ -525,7 +530,7 @@ int ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(
break;
} else {
if (REACH_TIME_INTERVAL(1000 * 1000L)) { // 1s
LOG_WARN("schema version not sync", K(tenant_id),
LOG_WARN("schema version not sync", K(tenant_id), K(consensus_timeout),
K(refreshed_schema_version), K(consensus_schema_version), K(schema_version));
}
ob_usleep(10 * 1000L); // 10ms

View File

@ -156,8 +156,7 @@ public:
static int try_check_parallel_ddl_schema_in_sync(
const ObTimeoutCtx &ctx,
const uint64_t tenant_id,
const int64_t schema_version,
const int64_t consensus_timeout);
const int64_t schema_version);
private:
static int get_tenant_variable(schema::ObSchemaGetterGuard &schema_guard,
uint64_t tenant_id,

View File

@ -568,8 +568,8 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt)
LOG_WARN("rpc proxy create table failed", KR(ret), "dst", common_rpc_proxy->get_server());
} else {
int64_t refresh_time = ObTimeUtility::current_time();
if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(ctx,
tenant_id, res.schema_version_, GCONF._wait_interval_after_truncate))) {
if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(
ctx, tenant_id, res.schema_version_))) {
LOG_WARN("fail to check paralleld ddl schema in sync", KR(ret), K(res));
}
int64_t end_time = ObTimeUtility::current_time();
@ -2208,8 +2208,8 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st
} else if (!res.is_valid()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("truncate invalid ddl_res", KR(ret), K(res));
} else if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(ctx,
tenant_id, res.task_id_, GCONF._wait_interval_after_truncate))) {
} else if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(
ctx, tenant_id, res.task_id_))) {
LOG_WARN("fail to check parallel ddl schema in sync", KR(ret), K(res));
}
int64_t end_time = ObTimeUtility::current_time();

View File

@ -378,7 +378,7 @@ _transfer_start_rpc_timeout
_transfer_start_trans_timeout
_tx_result_retention
_upgrade_stage
_wait_interval_after_truncate
_wait_interval_after_parallel_ddl
_with_subquery
_xa_gc_interval
_xa_gc_timeout