From 930fc3844a4f940dae169d36e7688ea1f4cf57a9 Mon Sep 17 00:00:00 2001 From: tino247 Date: Sat, 9 Sep 2023 10:59:21 +0000 Subject: [PATCH] Modify schema related parameters --- src/share/parameter/ob_parameter_seed.ipp | 8 ++--- .../ob_multi_version_schema_service.cpp | 34 +++++++++---------- src/share/schema/ob_schema_mgr_cache.cpp | 19 ++++++++--- src/share/schema/ob_schema_utils.cpp | 11 ++++-- src/share/schema/ob_schema_utils.h | 3 +- src/sql/engine/cmd/ob_table_executor.cpp | 8 ++--- .../all_virtual_sys_parameter_stat.result | 2 +- 7 files changed, 48 insertions(+), 37 deletions(-) diff --git a/src/share/parameter/ob_parameter_seed.ipp b/src/share/parameter/ob_parameter_seed.ipp index b0a50e87de..ebced62904 100755 --- a/src/share/parameter/ob_parameter_seed.ipp +++ b/src/share/parameter/ob_parameter_seed.ipp @@ -1207,8 +1207,8 @@ DEF_STR_WITH_CHECKER(_audit_mode, OB_TENANT_PARAMETER, "NONE", "MYSQL: use mysql audit" "ORACLE: use oracle audit", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_INT(_max_schema_slot_num, OB_CLUSTER_PARAMETER, "128", "[2,256]", - "the max schema slot number for each tenant, " +DEF_INT(_max_schema_slot_num, OB_TENANT_PARAMETER, "128", "[2,256]", + "the max schema slot number for multi-version schema memory management, " "Range: [2, 256] in integer", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_INT_WITH_CHECKER(_ob_query_rate_limit, OB_TENANT_PARAMETER, "-1", @@ -1557,8 +1557,8 @@ DEF_BOOL(_xsolapi_generate_with_clause, OB_TENANT_PARAMETER, "True", DEF_BOOL(_optimizer_group_by_placement, OB_TENANT_PARAMETER, "True", "enable group by placement transform rule", ObParameterAttr(Section::TENANT, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); -DEF_TIME(_wait_interval_after_truncate, OB_CLUSTER_PARAMETER, "30s", "[0s,)", - "time interval for waiting other servers to refresh schema after truncate", +DEF_TIME(_wait_interval_after_parallel_ddl, OB_TENANT_PARAMETER, "30s", "[0s,)", + "time interval for waiting other servers to refresh schema after parallel ddl is done", ObParameterAttr(Section::OBSERVER, Source::DEFAULT, EditLevel::DYNAMIC_EFFECTIVE)); DEF_CAP(_rebuild_replica_log_lag_threshold, OB_TENANT_PARAMETER, "0M", "[0M,)", "size of clog files that a replica lag behind leader to trigger rebuild, 0 means never trigger rebuild on purpose. Range: [0, +∞)", diff --git a/src/share/schema/ob_multi_version_schema_service.cpp b/src/share/schema/ob_multi_version_schema_service.cpp index 44a5b258fa..04e469d49b 100644 --- a/src/share/schema/ob_multi_version_schema_service.cpp +++ b/src/share/schema/ob_multi_version_schema_service.cpp @@ -2207,7 +2207,12 @@ int ObMultiVersionSchemaService::add_schema( // try switch allocator if (OB_SUCC(ret)) { bool can_switch = false; - int64_t switch_cnt = ObSchemaService::g_liboblog_mode_ ? init_version_cnt_ : GCONF._max_schema_slot_num; + int64_t max_schema_slot_num = OB_MAX_VERSION_COUNT; + omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id)); + if (tenant_config.is_valid()) { + max_schema_slot_num = tenant_config->_max_schema_slot_num; + } + const int64_t switch_cnt = ObSchemaService::g_liboblog_mode_ ? init_version_cnt_ : max_schema_slot_num; if (OB_FAIL(mem_mgr->check_can_switch_allocator(switch_cnt, can_switch))) { LOG_WARN("fail to check can switch allocator", KR(ret)); } else if (can_switch) { @@ -2384,8 +2389,9 @@ int ObMultiVersionSchemaService::async_refresh_schema( // do nothing } else { int64_t retry_cnt = 0; - int64_t MAX_RETRY_CNT = 1000; - const __useconds_t RETRY_IDLE_TIME = 100 * 1000L; // 100ms + const __useconds_t RETRY_IDLE_TIME = 10 * 1000L; // 10ms + const int64_t MAX_RETRY_CNT = 100 * 1000 * 1000L / RETRY_IDLE_TIME; // 100s at most + const int64_t SUBMIT_TASK_FREQUENCE = 2 * 1000 * 1000L / RETRY_IDLE_TIME; // each 2s while (OB_SUCC(ret)) { if (THIS_WORKER.is_timeout() || (INT64_MAX == THIS_WORKER.get_timeout_ts() && retry_cnt >= MAX_RETRY_CNT)) { @@ -2400,21 +2406,13 @@ int ObMultiVersionSchemaService::async_refresh_schema( // success break; } else { - if (0 == retry_cnt % 20) { - // try refresh schema each 2s - { - bool is_dropped = false; - ObSchemaGetterGuard guard; - if (OB_FAIL(get_tenant_schema_guard(OB_SYS_TENANT_ID, guard))) { - LOG_WARN("fail to get schema guard", KR(ret)); - } else if (OB_FAIL(guard.check_if_tenant_has_been_dropped(tenant_id, is_dropped))) { - LOG_WARN("fail to check if tenant has been dropped", KR(ret), K(tenant_id)); - } else if (is_dropped) { - ret = OB_TENANT_HAS_BEEN_DROPPED; - LOG_WARN("tenant has been dropped", KR(ret), K(tenant_id)); - } - } - if (OB_FAIL(ret)) { + if (0 == retry_cnt % SUBMIT_TASK_FREQUENCE) { + bool is_dropped = false; + if (OB_FAIL(check_if_tenant_has_been_dropped(tenant_id, is_dropped))) { + LOG_WARN("fail to check if tenant has been dropped", KR(ret), K(tenant_id)); + } else if (is_dropped) { + ret = OB_TENANT_HAS_BEEN_DROPPED; + LOG_WARN("tenant has been dropped", KR(ret), K(tenant_id)); } else if (OB_ISNULL(GCTX.ob_service_)) { ret = OB_ERR_UNEXPECTED; LOG_WARN("observice is null", K(ret)); diff --git a/src/share/schema/ob_schema_mgr_cache.cpp b/src/share/schema/ob_schema_mgr_cache.cpp index 2314f36010..667f1f46dc 100755 --- a/src/share/schema/ob_schema_mgr_cache.cpp +++ b/src/share/schema/ob_schema_mgr_cache.cpp @@ -500,15 +500,24 @@ int ObSchemaMgrCache::put(ObSchemaMgr *schema_mgr, LOG_INFO("put schema mgr", "schema version", NULL != schema_mgr ? schema_mgr->get_schema_version() : OB_INVALID_VERSION); - if (!check_inner_stat()) { + if (OB_UNLIKELY(!check_inner_stat())) { ret = OB_INNER_STAT_ERROR; - LOG_WARN("inner stat error", K(ret)); - } else if (NULL == schema_mgr) { + LOG_WARN("inner stat error", KR(ret)); + } else if (OB_ISNULL(schema_mgr)) { ret = OB_INVALID_ARGUMENT; - LOG_WARN("invalid argument", K(ret), K(schema_mgr)); + LOG_WARN("invalid argument", KR(ret), KP(schema_mgr)); + } else if (OB_UNLIKELY(OB_INVALID_TENANT_ID == schema_mgr->get_tenant_id())) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("invalid tenant_id", KR(ret), "tenant_id", schema_mgr->get_tenant_id()); } else { ObSchemaMgrItem *dst_item = NULL; bool is_stop = false; + const uint64_t tenant_id = schema_mgr->get_tenant_id(); + int64_t max_schema_slot_num = max_cached_num_; + omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id)); + if (tenant_config.is_valid()) { + max_schema_slot_num = tenant_config->_max_schema_slot_num; + } TCWLockGuard guard(lock_); // 1. In order to avoid the repeated adjustment of the configuration item _max_schema_slot_num that may cause problems // that may be caused by the invisible version in the history, max_cached_num_ can only be increased during @@ -520,7 +529,7 @@ int ObSchemaMgrCache::put(ObSchemaMgr *schema_mgr, // and the schema_mgr memory management strategy is different from the schema refresh scenario. // In order to reduce unnecessary memory usage, a fixed number of 16 slots is also used. if (!ObSchemaService::g_liboblog_mode_ && FALLBACK != mode_) { - max_cached_num_ = max(max_cached_num_, GCONF._max_schema_slot_num); + max_cached_num_ = max(max_cached_num_, max_schema_slot_num); } int64_t target_pos = -1; for (int64_t i = 0; i < max_cached_num_ && !is_stop; ++i) { diff --git a/src/share/schema/ob_schema_utils.cpp b/src/share/schema/ob_schema_utils.cpp index e6c5f4c6a0..44b939a6a8 100644 --- a/src/share/schema/ob_schema_utils.cpp +++ b/src/share/schema/ob_schema_utils.cpp @@ -492,15 +492,20 @@ int ObSchemaUtils::construct_inner_table_schemas( } return ret; } + int ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( const ObTimeoutCtx &ctx, const uint64_t tenant_id, - const int64_t schema_version, - const int64_t consensus_timeout) + const int64_t schema_version) { int ret = OB_SUCCESS; int64_t start_time = ObTimeUtility::current_time(); ObMultiVersionSchemaService *schema_service = NULL; + int64_t consensus_timeout = 30 * 1000 * 1000L; // 30s + omt::ObTenantConfigGuard tenant_config(OTC_MGR.get_tenant_config_with_lock(tenant_id)); + if (tenant_config.is_valid()) { + consensus_timeout = tenant_config->_wait_interval_after_parallel_ddl; + } if (OB_UNLIKELY(OB_INVALID_TENANT_ID == tenant_id || schema_version <= 0 || consensus_timeout < 0)) { @@ -525,7 +530,7 @@ int ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( break; } else { if (REACH_TIME_INTERVAL(1000 * 1000L)) { // 1s - LOG_WARN("schema version not sync", K(tenant_id), + LOG_WARN("schema version not sync", K(tenant_id), K(consensus_timeout), K(refreshed_schema_version), K(consensus_schema_version), K(schema_version)); } ob_usleep(10 * 1000L); // 10ms diff --git a/src/share/schema/ob_schema_utils.h b/src/share/schema/ob_schema_utils.h index 2731dd78a5..7bc96aca35 100644 --- a/src/share/schema/ob_schema_utils.h +++ b/src/share/schema/ob_schema_utils.h @@ -156,8 +156,7 @@ public: static int try_check_parallel_ddl_schema_in_sync( const ObTimeoutCtx &ctx, const uint64_t tenant_id, - const int64_t schema_version, - const int64_t consensus_timeout); + const int64_t schema_version); private: static int get_tenant_variable(schema::ObSchemaGetterGuard &schema_guard, uint64_t tenant_id, diff --git a/src/sql/engine/cmd/ob_table_executor.cpp b/src/sql/engine/cmd/ob_table_executor.cpp index 92293044ca..23657d4a7f 100644 --- a/src/sql/engine/cmd/ob_table_executor.cpp +++ b/src/sql/engine/cmd/ob_table_executor.cpp @@ -568,8 +568,8 @@ int ObCreateTableExecutor::execute(ObExecContext &ctx, ObCreateTableStmt &stmt) LOG_WARN("rpc proxy create table failed", KR(ret), "dst", common_rpc_proxy->get_server()); } else { int64_t refresh_time = ObTimeUtility::current_time(); - if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(ctx, - tenant_id, res.schema_version_, GCONF._wait_interval_after_truncate))) { + if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( + ctx, tenant_id, res.schema_version_))) { LOG_WARN("fail to check paralleld ddl schema in sync", KR(ret), K(res)); } int64_t end_time = ObTimeUtility::current_time(); @@ -2208,8 +2208,8 @@ int ObTruncateTableExecutor::execute(ObExecContext &ctx, ObTruncateTableStmt &st } else if (!res.is_valid()) { ret = OB_ERR_UNEXPECTED; LOG_WARN("truncate invalid ddl_res", KR(ret), K(res)); - } else if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync(ctx, - tenant_id, res.task_id_, GCONF._wait_interval_after_truncate))) { + } else if (OB_FAIL(ObSchemaUtils::try_check_parallel_ddl_schema_in_sync( + ctx, tenant_id, res.task_id_))) { LOG_WARN("fail to check parallel ddl schema in sync", KR(ret), K(res)); } int64_t end_time = ObTimeUtility::current_time(); diff --git a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result index caf2d6bda0..6af8b45257 100644 --- a/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result +++ b/tools/deploy/mysql_test/test_suite/inner_table/r/mysql/all_virtual_sys_parameter_stat.result @@ -378,7 +378,7 @@ _transfer_start_rpc_timeout _transfer_start_trans_timeout _tx_result_retention _upgrade_stage -_wait_interval_after_truncate +_wait_interval_after_parallel_ddl _with_subquery _xa_gc_interval _xa_gc_timeout