Fix the bug caused by the schema version inconsistency between the remote execution control server and the execution server

This commit is contained in:
yishenglanlingzui
2023-01-28 15:23:54 +08:00
committed by ob-robot
parent fdc52de1a2
commit f1750993cc
6 changed files with 87 additions and 65 deletions

View File

@ -67,24 +67,35 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
int64_t tenant_local_version = -1;
int64_t sys_local_version = -1;
int64_t query_timeout = 0;
uint64_t tenant_id = 0;
if (OB_ISNULL(gctx_.schema_service_) || OB_ISNULL(gctx_.sql_engine_)
|| OB_ISNULL(gctx_.executor_rpc_) || OB_ISNULL(session_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("schema service or sql engine is NULL", K(ret),
K(gctx_.schema_service_), K(gctx_.sql_engine_));
} else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0 &&
(tenant_schema_version == OB_INVALID_VERSION || sys_schema_version == OB_INVALID_VERSION)) {
// For 4.1 and later versions, it is not allowed to pass schema_version as -1
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_schema_version and sys_schema_version", K(ret),
K(tenant_schema_version), K(sys_schema_version));
} else if (FALSE_IT(tenant_id = session_info->get_effective_tenant_id())) {
// record tanent_id
} else if (tenant_id == 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tanent_id", K(ret));
} else if (FALSE_IT(THIS_WORKER.set_compatibility_mode(
ORACLE_MODE == session_info->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL))) {
//设置RPC work线程的租户兼容模式
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(
session_info->get_effective_tenant_id(), tenant_local_version))) {
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(tenant_id, tenant_local_version))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// 本地schema可能还没刷出来
tenant_local_version = OB_INVALID_VERSION;
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to get tenant refreshed schema version", K(ret),
K(session_info->get_effective_tenant_id()));
LOG_WARN("fail to get tenant refreshed schema version", K(ret), K(tenant_id));
}
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(
OB_SYS_TENANT_ID, sys_local_version))) {
@ -101,67 +112,53 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
}
}
if (OB_SUCC(ret) && tenant_schema_version != tenant_local_version) {
// 获取schema_guard (tenant_local_version, sys_schema_version)
if (OB_SUCC(ret)) {
// First fetch the latest local schema_guard
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
session_info->get_effective_tenant_id(),
schema_guard_))) {
LOG_WARN("fail to get schema guard", K(ret),K(tenant_local_version), K(sys_local_version),
K(tenant_schema_version), K(sys_schema_version));
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_SUCC(sql::ObSQLUtils::check_table_version(table_version_equal,
dependency_tables, schema_guard_))) {
// 检查table version返回成功
if (!table_version_equal) {
// tenant_schema_version > tenant_local_version 刷新schema,使用指定的schema版本获取guard
// tenant_schema_version < tenant_local_version 本地超前,用task指定的schema版本获取guard
if (tenant_schema_version > tenant_local_version) {
// 本地schema version落后。此时需要刷新
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(
session_info->get_effective_tenant_id(), tenant_schema_version))) {
LOG_WARN("fail to push back effective_tenant_id", K(ret),
K(session_info->get_effective_tenant_id()),
K(tenant_schema_version), K(tenant_local_version));
}
}
}
} else {
// check table version失败, 忽略错误,继续向下
// 无论本地schema版本超前还是落后于,都使用task指定的schema version
ret = OB_SUCCESS;
LOG_INFO("fail to check table_schema_version", K(ret));
if (tenant_schema_version > tenant_local_version) {
// 本地落后于task指定的schema version,刷新本地的schema
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(
session_info->get_effective_tenant_id(), tenant_schema_version))) {
LOG_WARN("fail to push back effective_tenant_id", K(ret),
K(session_info->get_effective_tenant_id()),
K(tenant_schema_version), K(tenant_local_version));
}
}
tenant_id, schema_guard_, tenant_local_version, sys_local_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_local_version),
K(sys_local_version), K(tenant_schema_version), K(sys_schema_version));
}
}
// table_version_equal == false:
// 只有当table_version_equal==true时,使用local_schema_version获取schema guard
// 其余情况都需要需要获取task指定schema版本的schema_guard
if (OB_SUCC(ret) && !table_version_equal) {
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
session_info->get_effective_tenant_id(),
schema_guard_,
tenant_schema_version,
sys_schema_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version));
if (OB_SUCC(ret) && tenant_schema_version != tenant_local_version) {
// You must have obtained schema_guard_ before,
// and obtained the latest schema version of the current server
if (OB_FAIL(ObSQLUtils::check_table_version(table_version_equal, dependency_tables, schema_guard_))) {
LOG_WARN("fail to check table_version", K(ret), K(dependency_tables));
} else if (!table_version_equal) {
if (tenant_schema_version == OB_INVALID_VERSION) {
// When the table version is inconsistent, the schema_version sent from the control terminal is -1,
// and you need to retry until the table version is consistent
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
LOG_WARN("dependency table version is not equal, need retry", K(ret));
} else if (tenant_schema_version > tenant_local_version) {
// The local schema version is behind. At this point,
// you need to refresh the schema version and reacquire schema_guard
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(tenant_id, tenant_schema_version))) {
LOG_WARN("fail to push back effective_tenant_id", K(ret), K(tenant_schema_version),
K(tenant_id), K(tenant_local_version));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version));
}
} else if (tenant_schema_version <= tenant_local_version) {
// In this scenario, schema_guard needs to be taken again
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version));
}
}
} else {
// If table_version_equal == true, the table schema is consistent,
// At this point, there is no need to reacquire schema_guard, the current schema_guard is available
}
}
int64_t local_tenant_schema_version = -1;
if (OB_FAIL(ret)) {
//do nothing
} else if (OB_FAIL(schema_guard_.get_schema_version(session_info->get_effective_tenant_id(), local_tenant_schema_version))) {
} else if (OB_FAIL(schema_guard_.get_schema_version(tenant_id, local_tenant_schema_version))) {
LOG_WARN("get schema version from schema_guard failed", K(ret));
} else if (OB_FAIL(session_info->get_query_timeout(query_timeout))) {
LOG_WARN("get query timeout failed", K(ret));
@ -177,13 +174,14 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
vt_ctx.schema_guard_ = &schema_guard_;
exec_ctx_.set_virtual_table_ctx(vt_ctx);
}
LOG_TRACE("print tenant_schema_version for remote_execute", K(tenant_schema_version), K(tenant_local_version),
K(local_tenant_schema_version), K(table_version_equal), K(ret), K(sys_schema_version), K(sys_local_version));
if (OB_FAIL(ret)) {
if (local_tenant_schema_version != tenant_schema_version) {
if (is_schema_error(ret)) {
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH; // 重写错误码,使得scheduler端能等待远端schema刷新并重试
}
} else if (-1 == tenant_schema_version && ret == OB_TENANT_NOT_EXIST) {
} else if (ret == OB_TENANT_NOT_EXIST) {
// fix bug: https://work.aone.alibaba-inc.com/issue/45890226
// 控制端重启observer,导致租户schema没刷出来,发送过来的schema_version异常, 让对端重试
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
@ -195,8 +193,7 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
// overwrite ret to make sure sql will retry
if (OB_NOT_NULL(session_info)
&& OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == ret
&& GSCHEMASERVICE.is_schema_error_need_retry(
NULL, session_info->get_effective_tenant_id())) {
&& GSCHEMASERVICE.is_schema_error_need_retry(NULL, tenant_id)) {
ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
}
}

View File

@ -160,6 +160,8 @@ int ObRemoteScheduler::build_remote_task(ObExecContext &ctx,
remote_task.set_session(ctx.get_my_session());
remote_task.set_query_schema_version(task_exec_ctx.get_query_tenant_begin_schema_version(),
task_exec_ctx.get_query_sys_begin_schema_version());
LOG_TRACE("print schema_version", K(task_exec_ctx.get_query_tenant_begin_schema_version()),
K(task_exec_ctx.get_query_sys_begin_schema_version()));
remote_task.set_remote_sql_info(&plan_ctx->get_remote_sql_info());
ObDASTabletLoc *first_tablet_loc = DAS_CTX(ctx).get_table_loc_list().get_first()->get_first_tablet_loc();
if (OB_ISNULL(session = ctx.get_my_session())) {