[CP] [IMPROVE] add new refresh schema implement to wait most 10ms
This commit is contained in:
@ -2313,11 +2313,7 @@ int ObMultiVersionSchemaService::async_refresh_schema(
|
||||
const int64_t MAX_RETRY_CNT = 100 * 1000 * 1000L / RETRY_IDLE_TIME; // 100s at most
|
||||
const int64_t SUBMIT_TASK_FREQUENCE = 2 * 1000 * 1000L / RETRY_IDLE_TIME; // each 2s
|
||||
while (OB_SUCC(ret)) {
|
||||
if (THIS_WORKER.is_timeout()
|
||||
|| (INT64_MAX == THIS_WORKER.get_timeout_ts() && retry_cnt >= MAX_RETRY_CNT)) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("already timeout", KR(ret), K(tenant_id), K(schema_version));
|
||||
} else if (OB_FAIL(get_tenant_refreshed_schema_version(
|
||||
if (OB_FAIL(get_tenant_refreshed_schema_version(
|
||||
tenant_id, local_schema_version))) {
|
||||
LOG_WARN("fail to get tenant refreshed schema version",
|
||||
KR(ret), K(tenant_id), K(schema_version));
|
||||
@ -2325,6 +2321,10 @@ int ObMultiVersionSchemaService::async_refresh_schema(
|
||||
&& (!check_formal || ObSchemaService::is_formal_version(local_schema_version))) {
|
||||
// success
|
||||
break;
|
||||
} else if (THIS_WORKER.is_timeout()
|
||||
|| (!THIS_WORKER.is_timeout_ts_valid() && retry_cnt >= MAX_RETRY_CNT)) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("already timeout", KR(ret), K(tenant_id), K(schema_version));
|
||||
} else {
|
||||
if (0 == retry_cnt % SUBMIT_TASK_FREQUENCE) {
|
||||
{
|
||||
@ -2354,8 +2354,14 @@ int ObMultiVersionSchemaService::async_refresh_schema(
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret)) {
|
||||
int64_t sleep_time = RETRY_IDLE_TIME;
|
||||
if (THIS_WORKER.is_timeout_ts_valid()
|
||||
&& THIS_WORKER.get_timeout_remain() < RETRY_IDLE_TIME) {
|
||||
int64_t timeout_remain = THIS_WORKER.get_timeout_remain();
|
||||
sleep_time = timeout_remain > 0 ? timeout_remain : 0;
|
||||
}
|
||||
retry_cnt++;
|
||||
ob_usleep(RETRY_IDLE_TIME);
|
||||
ob_usleep(static_cast<useconds_t>(sleep_time));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -105,10 +105,8 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
|
||||
if (OB_FAIL(ret)) {
|
||||
//do nothing
|
||||
} else if (sys_schema_version > sys_local_version) {
|
||||
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(
|
||||
OB_SYS_TENANT_ID, sys_schema_version))) {
|
||||
LOG_WARN("fail to push back effective_tenant_id", K(ret),
|
||||
K(sys_schema_version), K(sys_local_version));
|
||||
if (OB_FAIL(try_refresh_schema_(OB_SYS_TENANT_ID, sys_schema_version, session_info->is_inner()))) {
|
||||
LOG_WARN("fail to try refresh systenant schema", KR(ret), K(sys_schema_version), K(sys_local_version));
|
||||
}
|
||||
}
|
||||
|
||||
@ -135,9 +133,9 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
|
||||
} else if (tenant_schema_version > tenant_local_version) {
|
||||
// The local schema version is behind. At this point,
|
||||
// you need to refresh the schema version and reacquire schema_guard
|
||||
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(tenant_id, tenant_schema_version))) {
|
||||
LOG_WARN("fail to push back effective_tenant_id", K(ret), K(tenant_schema_version),
|
||||
K(tenant_id), K(tenant_local_version));
|
||||
if (OB_FAIL(try_refresh_schema_(tenant_id, tenant_schema_version, session_info->is_inner()))) {
|
||||
LOG_WARN("fail to try refresh tenant schema", KR(ret), K(tenant_id),
|
||||
K(tenant_schema_version), K(tenant_local_version));
|
||||
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
|
||||
tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) {
|
||||
LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version));
|
||||
@ -831,6 +829,43 @@ void ObRemoteBaseExecuteP<T>::base_cleanup()
|
||||
return;
|
||||
}
|
||||
|
||||
template<typename T>
|
||||
int ObRemoteBaseExecuteP<T>::try_refresh_schema_(const uint64_t tenant_id,
|
||||
const int64_t schema_version,
|
||||
const bool is_inner_sql)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
const int64_t timeout_remain = THIS_WORKER.get_timeout_remain();
|
||||
if (OB_UNLIKELY(OB_INVALID_ID == tenant_id)) {
|
||||
ret = OB_INVALID_ARGUMENT;
|
||||
LOG_WARN("arg is invalid", KR(ret), K(tenant_id));
|
||||
} else if (OB_UNLIKELY(timeout_remain <= 0)) {
|
||||
ret = OB_TIMEOUT;
|
||||
LOG_WARN("THIS_WORKER is timeout", KR(ret), K(tenant_id));
|
||||
} else if (OB_ISNULL(gctx_.schema_service_)) {
|
||||
ret = OB_ERR_UNEXPECTED;
|
||||
LOG_WARN("schema service is NULL", KR(ret), K(tenant_id));
|
||||
} else {
|
||||
const int64_t orig_timeout_ts = THIS_WORKER.get_timeout_ts();
|
||||
const int64_t try_refresh_time = is_inner_sql ? timeout_remain : std::min(10 * 1000L, timeout_remain);
|
||||
THIS_WORKER.set_timeout_ts(ObTimeUtility::current_time() + try_refresh_time);
|
||||
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(
|
||||
tenant_id, schema_version))) {
|
||||
LOG_WARN("fail to refresh schema", KR(ret), K(tenant_id),
|
||||
K(schema_version), K(try_refresh_time));
|
||||
}
|
||||
THIS_WORKER.set_timeout_ts(orig_timeout_ts);
|
||||
if (OB_TIMEOUT == ret
|
||||
&& THIS_WORKER.is_timeout_ts_valid()
|
||||
&& !THIS_WORKER.is_timeout()) {
|
||||
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
|
||||
LOG_WARN("fail to refresh schema in try refresh time", KR(ret), K(tenant_id),
|
||||
K(schema_version), K(try_refresh_time));
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObRpcRemoteExecuteP::init()
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
|
@ -77,6 +77,12 @@ protected:
|
||||
int &err,
|
||||
ObSQLSessionInfo &session,
|
||||
int64_t &retry_times);
|
||||
private:
|
||||
// if inner sql goes through this func, we should wait refresh schema until worker timeout,
|
||||
// otherwise we will wait time in range of min(10ms, timeout_remain)
|
||||
int try_refresh_schema_(const uint64_t tenant_id,
|
||||
const int64_t schema_version,
|
||||
const bool is_inner_sql);
|
||||
|
||||
protected:
|
||||
const observer::ObGlobalContext &gctx_;
|
||||
|
Reference in New Issue
Block a user