Fix the bug caused by the schema version inconsistency between the remote execution ctrl_server and execution_server

This commit is contained in:
yishenglanlingzui
2023-01-06 10:09:23 +00:00
committed by ob-robot
parent d3d0e6d263
commit 678650edc8
2 changed files with 56 additions and 61 deletions

View File

@ -753,7 +753,7 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor &executor,
LOG_WARN("get tenant schema version failed", K(ret), K(ob_sql_));
} else if (OB_FAIL(res.schema_guard_.get_schema_version(OB_SYS_TENANT_ID, local_sys_schema_version))) {
LOG_WARN("get sys tenant schema version failed", K(ret), K(ob_sql_));
} else if (OB_UNLIKELY(is_extern_session())) {
} else {
res.result_set().get_exec_context().get_task_exec_ctx().set_query_tenant_begin_schema_version(local_tenant_schema_version);
res.result_set().get_exec_context().get_task_exec_ctx().set_query_sys_begin_schema_version(local_sys_schema_version);
}
@ -763,7 +763,8 @@ int ObInnerSQLConnection::query(sqlclient::ObIExecutor &executor,
// do nothing
} else if (OB_FAIL(SMART_CALL(do_query(executor, res)))) {
ret_code = ret;
LOG_WARN("execute failed", K(ret), K(tenant_id), K(executor), K(retry_cnt));
LOG_WARN("execute failed", K(ret), K(tenant_id), K(executor), K(retry_cnt),
K(local_sys_schema_version), K(local_tenant_schema_version));
ret = process_retry(res, ret, abs_timeout_us, need_retry, retry_cnt, is_from_pl);
// moved here from ObInnerSQLConnection::do_query() -> ObInnerSQLResult::open().
int close_ret = res.force_close();

View File

@ -67,24 +67,35 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
int64_t tenant_local_version = -1;
int64_t sys_local_version = -1;
int64_t query_timeout = 0;
uint64_t tenant_id = 0;
if (OB_ISNULL(gctx_.schema_service_) || OB_ISNULL(gctx_.sql_engine_)
|| OB_ISNULL(gctx_.executor_rpc_) || OB_ISNULL(session_info)) {
ret = OB_ERR_UNEXPECTED;
LOG_ERROR("schema service or sql engine is NULL", K(ret),
K(gctx_.schema_service_), K(gctx_.sql_engine_));
} else if (GET_MIN_CLUSTER_VERSION() >= CLUSTER_VERSION_4_1_0_0 &&
(tenant_schema_version == OB_INVALID_VERSION || sys_schema_version == OB_INVALID_VERSION)) {
// 4.1以及之后的版本,不允许传schema_version为-1
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tenant_schema_version and sys_schema_version",
K(tenant_schema_version), K(sys_schema_version));
} else if (FALSE_IT(tenant_id = session_info->get_effective_tenant_id())) {
// record tanent_id
} else if (tenant_id == 0) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid tanent_id", K(ret));
} else if (FALSE_IT(THIS_WORKER.set_compatibility_mode(
ORACLE_MODE == session_info->get_compatibility_mode() ?
lib::Worker::CompatMode::ORACLE : lib::Worker::CompatMode::MYSQL))) {
//设置RPC work线程的租户兼容模式
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(
session_info->get_effective_tenant_id(), tenant_local_version))) {
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(tenant_id, tenant_local_version))) {
if (OB_ENTRY_NOT_EXIST == ret) {
// 本地schema可能还没刷出来
tenant_local_version = OB_INVALID_VERSION;
ret = OB_SUCCESS;
} else {
LOG_WARN("fail to get tenant refreshed schema version", K(ret),
K(session_info->get_effective_tenant_id()));
LOG_WARN("fail to get tenant refreshed schema version", K(ret), K(tenant_id));
}
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_refreshed_schema_version(
OB_SYS_TENANT_ID, sys_local_version))) {
@ -101,67 +112,50 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
}
}
if (OB_SUCC(ret) && tenant_schema_version != tenant_local_version) {
// 获取schema_guard (tenant_local_version, sys_schema_version)
if (OB_SUCC(ret)) {
// 先取一次本地最新的schema_guard
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
session_info->get_effective_tenant_id(),
schema_guard_))) {
LOG_WARN("fail to get schema guard", K(ret),K(tenant_local_version), K(sys_local_version),
K(tenant_schema_version), K(sys_schema_version));
}
if (OB_FAIL(ret)) {
// do nothing
} else if (OB_SUCC(sql::ObSQLUtils::check_table_version(table_version_equal,
dependency_tables, schema_guard_))) {
// 检查table version返回成功
if (!table_version_equal) {
// tenant_schema_version > tenant_local_version 刷新schema,使用指定的schema版本获取guard
// tenant_schema_version < tenant_local_version 本地超前,用task指定的schema版本获取guard
if (tenant_schema_version > tenant_local_version) {
// 本地schema version落后。此时需要刷新
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(
session_info->get_effective_tenant_id(), tenant_schema_version))) {
LOG_WARN("fail to push back effective_tenant_id", K(ret),
K(session_info->get_effective_tenant_id()),
K(tenant_schema_version), K(tenant_local_version));
}
}
}
} else {
// check table version失败, 忽略错误,继续向下
// 无论本地schema版本超前还是落后于,都使用task指定的schema version
ret = OB_SUCCESS;
LOG_INFO("fail to check table_schema_version", K(ret));
if (tenant_schema_version > tenant_local_version) {
// 本地落后于task指定的schema version,刷新本地的schema
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(
session_info->get_effective_tenant_id(), tenant_schema_version))) {
LOG_WARN("fail to push back effective_tenant_id", K(ret),
K(session_info->get_effective_tenant_id()),
K(tenant_schema_version), K(tenant_local_version));
}
}
tenant_id, schema_guard_, tenant_local_version, sys_local_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_local_version),
K(sys_local_version), K(tenant_schema_version), K(sys_schema_version));
}
}
// table_version_equal == false:
// 只有当table_version_equal==true时,使用local_schema_version获取schema guard,
// 其余情况都需要需要获取task指定schema版本的schema_guard
if (OB_SUCC(ret) && !table_version_equal) {
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
session_info->get_effective_tenant_id(),
schema_guard_,
tenant_schema_version,
sys_schema_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version));
if (OB_SUCC(ret) && tenant_schema_version != tenant_local_version) {
// 前边一定获取过schema_guard_,并且获取的是当前server最新的schema版本
if (OB_FAIL(ObSQLUtils::check_table_version(table_version_equal, dependency_tables, schema_guard_))) {
LOG_WARN("fail to check table_version", K(ret), K(dependency_tables));
} else if (!table_version_equal) {
if (tenant_schema_version == OB_INVALID_VERSION) {
// 表版本不一致时控制端传过来的schema_version是-1,需要重试直到表版本一致
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
LOG_WARN("dependency table version is not equal, need retry", K(ret));
} else if (tenant_schema_version > tenant_local_version) {
// 本地schema version落后。此时需要刷新schema版本,并且重新获取schema_guard
if (OB_FAIL(gctx_.schema_service_->async_refresh_schema(tenant_id, tenant_schema_version))) {
LOG_WARN("fail to push back effective_tenant_id", K(ret), K(tenant_schema_version),
K(tenant_id), K(tenant_local_version));
} else if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version));
}
} else if (tenant_schema_version <= tenant_local_version) {
// 这种场景下需要重新取一次schema_guard
if (OB_FAIL(gctx_.schema_service_->get_tenant_schema_guard(
tenant_id, schema_guard_, tenant_schema_version, sys_schema_version))) {
LOG_WARN("fail to get schema guard", K(ret), K(tenant_schema_version), K(sys_schema_version));
}
}
} else {
// 如果table_version_equal == true,说明表schema一致,
// 此时就不需要重新获取schema_guard了,当前schema_guard就是可用的
}
}
int64_t local_tenant_schema_version = -1;
if (OB_FAIL(ret)) {
//do nothing
} else if (OB_FAIL(schema_guard_.get_schema_version(session_info->get_effective_tenant_id(), local_tenant_schema_version))) {
} else if (OB_FAIL(schema_guard_.get_schema_version(tenant_id, local_tenant_schema_version))) {
LOG_WARN("get schema version from schema_guard failed", K(ret));
} else if (OB_FAIL(session_info->get_query_timeout(query_timeout))) {
LOG_WARN("get query timeout failed", K(ret));
@ -177,13 +171,14 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
vt_ctx.schema_guard_ = &schema_guard_;
exec_ctx_.set_virtual_table_ctx(vt_ctx);
}
LOG_TRACE("print tenant_schema_version for remote_execute", K(tenant_schema_version), K(tenant_local_version),
K(local_tenant_schema_version), K(table_version_equal), K(ret), K(sys_schema_version), K(sys_local_version));
if (OB_FAIL(ret)) {
if (local_tenant_schema_version != tenant_schema_version) {
if (is_schema_error(ret)) {
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH; // 重写错误码,使得scheduler端能等待远端schema刷新并重试
}
} else if (-1 == tenant_schema_version && ret == OB_TENANT_NOT_EXIST) {
} else if (ret == OB_TENANT_NOT_EXIST) {
// fix bug: https://work.aone.alibaba-inc.com/issue/45890226
// 控制端重启observer,导致租户schema没刷出来,发送过来的schema_version异常, 让对端重试
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
@ -195,8 +190,7 @@ int ObRemoteBaseExecuteP<T>::base_before_process(int64_t tenant_schema_version,
// overwrite ret to make sure sql will retry
if (OB_NOT_NULL(session_info)
&& OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH == ret
&& GSCHEMASERVICE.is_schema_error_need_retry(
NULL, session_info->get_effective_tenant_id())) {
&& GSCHEMASERVICE.is_schema_error_need_retry(NULL, tenant_id)) {
ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
}
}