[CP] [IMPROVE] add new refresh schema implement to wait most 10ms
This commit is contained in:
		| @ -2313,11 +2313,7 @@ int ObMultiVersionSchemaService::async_refresh_schema( | |||||||
|     const int64_t MAX_RETRY_CNT = 100 * 1000 * 1000L / RETRY_IDLE_TIME; // 100s at most |     const int64_t MAX_RETRY_CNT = 100 * 1000 * 1000L / RETRY_IDLE_TIME; // 100s at most | ||||||
|     const int64_t SUBMIT_TASK_FREQUENCE = 2 * 1000 * 1000L / RETRY_IDLE_TIME; // each 2s |     const int64_t SUBMIT_TASK_FREQUENCE = 2 * 1000 * 1000L / RETRY_IDLE_TIME; // each 2s | ||||||
|     while (OB_SUCC(ret)) { |     while (OB_SUCC(ret)) { | ||||||
|       if (THIS_WORKER.is_timeout() |       if (OB_FAIL(get_tenant_refreshed_schema_version( | ||||||
|           || (INT64_MAX == THIS_WORKER.get_timeout_ts() && retry_cnt >= MAX_RETRY_CNT)) { |  | ||||||
|         ret = OB_TIMEOUT; |  | ||||||
|         LOG_WARN("already timeout", KR(ret), K(tenant_id), K(schema_version)); |  | ||||||
|       } else if (OB_FAIL(get_tenant_refreshed_schema_version( |  | ||||||
|                          tenant_id, local_schema_version))) { |                          tenant_id, local_schema_version))) { | ||||||
|         LOG_WARN("fail to get tenant refreshed schema version", |         LOG_WARN("fail to get tenant refreshed schema version", | ||||||
|                  KR(ret), K(tenant_id), K(schema_version)); |                  KR(ret), K(tenant_id), K(schema_version)); | ||||||
| @ -2325,6 +2321,10 @@ int ObMultiVersionSchemaService::async_refresh_schema( | |||||||
|                  && (!check_formal || ObSchemaService::is_formal_version(local_schema_version))) { |                  && (!check_formal || ObSchemaService::is_formal_version(local_schema_version))) { | ||||||
|         // success |         // success | ||||||
|         break; |         break; | ||||||
|  |       } else if (THIS_WORKER.is_timeout() | ||||||
|  |                 || (!THIS_WORKER.is_timeout_ts_valid() && retry_cnt >= MAX_RETRY_CNT)) { | ||||||
|  |         ret = OB_TIMEOUT; | ||||||
|  |         LOG_WARN("already timeout", KR(ret), K(tenant_id), K(schema_version)); | ||||||
|       } else { |       } else { | ||||||
|         if (0 == retry_cnt % SUBMIT_TASK_FREQUENCE) { |         if (0 == retry_cnt % SUBMIT_TASK_FREQUENCE) { | ||||||
|           { |           { | ||||||
| @ -2354,8 +2354,14 @@ int ObMultiVersionSchemaService::async_refresh_schema( | |||||||
|           } |           } | ||||||
|         } |         } | ||||||
|         if (OB_SUCC(ret)) { |         if (OB_SUCC(ret)) { | ||||||
|  |           int64_t sleep_time = RETRY_IDLE_TIME; | ||||||
|  |           if (THIS_WORKER.is_timeout_ts_valid() | ||||||
|  |               && THIS_WORKER.get_timeout_remain() < RETRY_IDLE_TIME) { | ||||||
|  |             int64_t timeout_remain = THIS_WORKER.get_timeout_remain(); | ||||||
|  |             sleep_time = timeout_remain > 0 ? timeout_remain : 0; | ||||||
|  |           } | ||||||
|           retry_cnt++; |           retry_cnt++; | ||||||
|           ob_usleep(RETRY_IDLE_TIME); |           ob_usleep(static_cast<useconds_t>(sleep_time)); | ||||||
|         } |         } | ||||||
|       } |       } | ||||||
|     } |     } | ||||||
|  | |||||||
| @ -105,10 +105,8 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version, | |||||||
|   if (OB_FAIL(ret)) { |   if (OB_FAIL(ret)) { | ||||||
|     //do nothing |     //do nothing | ||||||
|   } else if (sys_schema_version > sys_local_version) { |   } else if (sys_schema_version > sys_local_version) { | ||||||
|     if (OB_FAIL(gctx_.schema_service_->async_refresh_schema( |     if (OB_FAIL(try_refresh_schema_(OB_SYS_TENANT_ID, sys_schema_version, session_info->is_inner()))) { | ||||||
|                 OB_SYS_TENANT_ID, sys_schema_version))) { |       LOG_WARN("fail to try refresh systenant schema", KR(ret), K(sys_schema_version), K(sys_local_version)); | ||||||
|       LOG_WARN("fail to push back effective_tenant_id", K(ret), |  | ||||||
|                K(sys_schema_version), K(sys_local_version)); |  | ||||||
|     } |     } | ||||||
|   } |   } | ||||||
|  |  | ||||||
| @ -135,9 +133,9 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version, | |||||||
|       } else if (tenant_schema_version > tenant_local_version) { |       } else if (tenant_schema_version > tenant_local_version) { | ||||||
|         // The local schema version is behind. At this point, |         // The local schema version is behind. At this point, | ||||||
|         // you need to refresh the schema version and reacquire schema_guard |         // you need to refresh the schema version and reacquire schema_guard | ||||||
|         if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(tenant_id, tenant_schema_version))) { |         if (OB_FAIL(try_refresh_schema_(tenant_id, tenant_schema_version, session_info->is_inner()))) { | ||||||
|           LOG_WARN("fail to push back effective_tenant_id", K(ret), K(tenant_schema_version), |           LOG_WARN("fail to try refresh tenant schema", KR(ret), K(tenant_id), | ||||||
|               K(tenant_id), K(tenant_local_version)); |                     K(tenant_schema_version), K(tenant_local_version)); | ||||||
|         } else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard( |         } else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard( | ||||||
|             tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) { |             tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) { | ||||||
|           LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version)); |           LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version)); | ||||||
| @ -831,6 +829,43 @@ void ObRemoteBaseExecuteP<T>::base_cleanup() | |||||||
|   return; |   return; | ||||||
| } | } | ||||||
|  |  | ||||||
|  | template<typename T> | ||||||
|  | int ObRemoteBaseExecuteP<T>::try_refresh_schema_(const uint64_t tenant_id, | ||||||
|  |                                                  const int64_t schema_version, | ||||||
|  |                                                  const bool is_inner_sql) | ||||||
|  | { | ||||||
|  |   int ret = OB_SUCCESS; | ||||||
|  |   const int64_t timeout_remain = THIS_WORKER.get_timeout_remain(); | ||||||
|  |   if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) { | ||||||
|  |     ret = OB_INVALID_ARGUMENT; | ||||||
|  |     LOG_WARN("arg is invalid", KR(ret), K(tenant_id)); | ||||||
|  |   } else if (OB_UNLIKELY(timeout_remain <= 0)) { | ||||||
|  |     ret = OB_TIMEOUT; | ||||||
|  |     LOG_WARN("THIS_WORKER is timeout", KR(ret), K(tenant_id)); | ||||||
|  |   } else if (OB_ISNULL(gctx_.schema_service_)) { | ||||||
|  |     ret = OB_ERR_UNEXPECTED; | ||||||
|  |     LOG_WARN("schema service is NULL", KR(ret), K(tenant_id)); | ||||||
|  |   } else { | ||||||
|  |     const int64_t orig_timeout_ts = THIS_WORKER.get_timeout_ts(); | ||||||
|  |     const int64_t try_refresh_time = is_inner_sql ? timeout_remain : std::min(10 * 1000L, timeout_remain); | ||||||
|  |     THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + try_refresh_time); | ||||||
|  |     if (OB_FAIL(gctx_.schema_service_->async_refresh_schema( | ||||||
|  |                 tenant_id, schema_version))) { | ||||||
|  |       LOG_WARN("fail to refresh schema", KR(ret), K(tenant_id), | ||||||
|  |                                          K(schema_version), K(try_refresh_time)); | ||||||
|  |     } | ||||||
|  |     THIS_WORKER.set_timeout_ts(orig_timeout_ts); | ||||||
|  |     if (OB_TIMEOUT == ret | ||||||
|  |         && THIS_WORKER.is_timeout_ts_valid() | ||||||
|  |         && !THIS_WORKER.is_timeout()) { | ||||||
|  |       ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH; | ||||||
|  |       LOG_WARN("fail to refresh schema in try refresh time", KR(ret), K(tenant_id), | ||||||
|  |                 K(schema_version), K(try_refresh_time)); | ||||||
|  |     } | ||||||
|  |   } | ||||||
|  |   return ret; | ||||||
|  | } | ||||||
|  |  | ||||||
| int ObRpcRemoteExecuteP::init() | int ObRpcRemoteExecuteP::init() | ||||||
| { | { | ||||||
|   int ret = OB_SUCCESS; |   int ret = OB_SUCCESS; | ||||||
|  | |||||||
| @ -77,6 +77,12 @@ protected: | |||||||
|                                  int &err, |                                  int &err, | ||||||
|                                  ObSQLSessionInfo &session, |                                  ObSQLSessionInfo &session, | ||||||
|                                  int64_t &retry_times); |                                  int64_t &retry_times); | ||||||
|  | private: | ||||||
|  |   // if inner sql goes through this func, we should wait refresh schema until worker timeout, | ||||||
|  |   // otherwise we will wait time in range of min(10ms, timeout_remain) | ||||||
|  |   int try_refresh_schema_(const uint64_t tenant_id, | ||||||
|  |                           const int64_t schema_version, | ||||||
|  |                           const bool is_inner_sql); | ||||||
|  |  | ||||||
| protected: | protected: | ||||||
|   const observer::ObGlobalContext &gctx_; |   const observer::ObGlobalContext &gctx_; | ||||||
|  | |||||||
		Reference in New Issue
	
	Block a user
	 obdev
					obdev