diff --git a/src/share/schema/ob_multi_version_schema_service.cpp b/src/share/schema/ob_multi_version_schema_service.cpp index f3f2ff98fc..2db012506b 100644 --- a/src/share/schema/ob_multi_version_schema_service.cpp +++ b/src/share/schema/ob_multi_version_schema_service.cpp @@ -2313,11 +2313,7 @@ int ObMultiVersionSchemaService::async_refresh_schema( const int64_t MAX_RETRY_CNT = 100 * 1000 * 1000L / RETRY_IDLE_TIME; // 100s at most const int64_t SUBMIT_TASK_FREQUENCE = 2 * 1000 * 1000L / RETRY_IDLE_TIME; // each 2s while (OB_SUCC(ret)) { - if (THIS_WORKER.is_timeout() - || (INT64_MAX == THIS_WORKER.get_timeout_ts() && retry_cnt >= MAX_RETRY_CNT)) { - ret = OB_TIMEOUT; - LOG_WARN("already timeout", KR(ret), K(tenant_id), K(schema_version)); - } else if (OB_FAIL(get_tenant_refreshed_schema_version( + if (OB_FAIL(get_tenant_refreshed_schema_version( tenant_id, local_schema_version))) { LOG_WARN("fail to get tenant refreshed schema version", KR(ret), K(tenant_id), K(schema_version)); @@ -2325,6 +2321,10 @@ int ObMultiVersionSchemaService::async_refresh_schema( && (!check_formal || ObSchemaService::is_formal_version(local_schema_version))) { // success break; + } else if (THIS_WORKER.is_timeout() + || (!THIS_WORKER.is_timeout_ts_valid() && retry_cnt >= MAX_RETRY_CNT)) { + ret = OB_TIMEOUT; + LOG_WARN("already timeout", KR(ret), K(tenant_id), K(schema_version)); } else { if (0 == retry_cnt % SUBMIT_TASK_FREQUENCE) { { @@ -2354,8 +2354,14 @@ int ObMultiVersionSchemaService::async_refresh_schema( } } if (OB_SUCC(ret)) { + int64_t sleep_time = RETRY_IDLE_TIME; + if (THIS_WORKER.is_timeout_ts_valid() + && THIS_WORKER.get_timeout_remain() < RETRY_IDLE_TIME) { + int64_t timeout_remain = THIS_WORKER.get_timeout_remain(); + sleep_time = timeout_remain > 0 ? timeout_remain : 0; + } retry_cnt++; - ob_usleep(RETRY_IDLE_TIME); + ob_usleep(static_cast(sleep_time)); } } } diff --git a/src/sql/executor/ob_remote_executor_processor.cpp b/src/sql/executor/ob_remote_executor_processor.cpp index 93459e9321..5b6f026a23 100644 --- a/src/sql/executor/ob_remote_executor_processor.cpp +++ b/src/sql/executor/ob_remote_executor_processor.cpp @@ -105,10 +105,8 @@ int ObRemoteBaseExecuteP::base_before_process(int64_t tenant_schema_version, if (OB_FAIL(ret)) { //do nothing } else if (sys_schema_version > sys_local_version) { - if (OB_FAIL(gctx_.schema_service_->async_refresh_schema( - OB_SYS_TENANT_ID, sys_schema_version))) { - LOG_WARN("fail to push back effective_tenant_id", K(ret), - K(sys_schema_version), K(sys_local_version)); + if (OB_FAIL(try_refresh_schema_(OB_SYS_TENANT_ID, sys_schema_version, session_info->is_inner()))) { + LOG_WARN("fail to try refresh systenant schema", KR(ret), K(sys_schema_version), K(sys_local_version)); } } @@ -135,9 +133,9 @@ int ObRemoteBaseExecuteP::base_before_process(int64_t tenant_schema_version, } else if (tenant_schema_version > tenant_local_version) { // The local schema version is behind. At this point, // you need to refresh the schema version and reacquire schema_guard - if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(tenant_id, tenant_schema_version))) { - LOG_WARN("fail to push back effective_tenant_id", K(ret), K(tenant_schema_version), - K(tenant_id), K(tenant_local_version)); + if (OB_FAIL(try_refresh_schema_(tenant_id, tenant_schema_version, session_info->is_inner()))) { + LOG_WARN("fail to try refresh tenant schema", KR(ret), K(tenant_id), + K(tenant_schema_version), K(tenant_local_version)); } else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard( tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) { LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version)); @@ -831,6 +829,43 @@ void ObRemoteBaseExecuteP::base_cleanup() return; } +template +int ObRemoteBaseExecuteP::try_refresh_schema_(const uint64_t tenant_id, + const int64_t schema_version, + const bool is_inner_sql) +{ + int ret = OB_SUCCESS; + const int64_t timeout_remain = THIS_WORKER.get_timeout_remain(); + if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("arg is invalid", KR(ret), K(tenant_id)); + } else if (OB_UNLIKELY(timeout_remain <= 0)) { + ret = OB_TIMEOUT; + LOG_WARN("THIS_WORKER is timeout", KR(ret), K(tenant_id)); + } else if (OB_ISNULL(gctx_.schema_service_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("schema service is NULL", KR(ret), K(tenant_id)); + } else { + const int64_t orig_timeout_ts = THIS_WORKER.get_timeout_ts(); + const int64_t try_refresh_time = is_inner_sql ? timeout_remain : std::min(10 * 1000L, timeout_remain); + THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + try_refresh_time); + if (OB_FAIL(gctx_.schema_service_->async_refresh_schema( + tenant_id, schema_version))) { + LOG_WARN("fail to refresh schema", KR(ret), K(tenant_id), + K(schema_version), K(try_refresh_time)); + } + THIS_WORKER.set_timeout_ts(orig_timeout_ts); + if (OB_TIMEOUT == ret + && THIS_WORKER.is_timeout_ts_valid() + && !THIS_WORKER.is_timeout()) { + ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH; + LOG_WARN("fail to refresh schema in try refresh time", KR(ret), K(tenant_id), + K(schema_version), K(try_refresh_time)); + } + } + return ret; +} + int ObRpcRemoteExecuteP::init() { int ret = OB_SUCCESS; diff --git a/src/sql/executor/ob_remote_executor_processor.h b/src/sql/executor/ob_remote_executor_processor.h index 0da538310b..a031a3f5ef 100644 --- a/src/sql/executor/ob_remote_executor_processor.h +++ b/src/sql/executor/ob_remote_executor_processor.h @@ -77,6 +77,12 @@ protected: int &err, ObSQLSessionInfo &session, int64_t &retry_times); +private: + // if inner sql goes through this func, we should wait refresh schema until worker timeout, + // otherwise we will wait time in range of min(10ms, timeout_remain) + int try_refresh_schema_(const uint64_t tenant_id, + const int64_t schema_version, + const bool is_inner_sql); protected: const observer::ObGlobalContext &gctx_;