fix px udf retry until timeout when schema not exist
This commit is contained in:
parent
daca15f707
commit
f32c53fdc8
@ -116,8 +116,34 @@ int ObInterruptUtil::interrupt_tasks(ObPxSqcMeta &sqc, int code)
|
||||
return ret;
|
||||
}
|
||||
|
||||
void ObInterruptUtil::update_schema_error_code(ObExecContext *exec_ctx, int &code)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (is_schema_error(code) && OB_NOT_NULL(exec_ctx) && OB_NOT_NULL(exec_ctx->get_my_session())) {
|
||||
uint64_t tenant_id = exec_ctx->get_my_session()->get_effective_tenant_id();
|
||||
ObSchemaGetterGuard schema_guard;
|
||||
int64_t local_schema_version = -1;
|
||||
if (OB_FAIL(GCTX.schema_service_->get_tenant_schema_guard(tenant_id, schema_guard))) {
|
||||
LOG_WARN("get tenant schema guard failed", K(ret));
|
||||
} else if (OB_FAIL(schema_guard.get_schema_version(tenant_id, local_schema_version))) {
|
||||
LOG_WARN("get schema version failed", K(ret));
|
||||
} else if (local_schema_version !=
|
||||
exec_ctx->get_task_exec_ctx().get_query_tenant_begin_schema_version()) {
|
||||
if (GSCHEMASERVICE.is_schema_error_need_retry(NULL, tenant_id)) {
|
||||
code = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
|
||||
} else {
|
||||
code = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
|
||||
}
|
||||
}
|
||||
LOG_TRACE("update_schema_error_code, exec_ctx is not null", K(tenant_id), K(local_schema_version),
|
||||
K(exec_ctx->get_task_exec_ctx().get_query_tenant_begin_schema_version()), K(lbt()));
|
||||
} else {
|
||||
LOG_TRACE("update_schema_error_code, exec_ctx is null", K(lbt()));
|
||||
}
|
||||
}
|
||||
|
||||
// SQC 向 QC 发送中断
|
||||
int ObInterruptUtil::interrupt_qc(ObPxSqcMeta &sqc, int code)
|
||||
int ObInterruptUtil::interrupt_qc(ObPxSqcMeta &sqc, int code, ObExecContext *exec_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObInterruptCode int_code(code,
|
||||
@ -128,15 +154,7 @@ int ObInterruptUtil::interrupt_qc(ObPxSqcMeta &sqc, int code)
|
||||
ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance();
|
||||
ObInterruptibleTaskID interrupt_id = sqc.get_interrupt_id().query_interrupt_id_;
|
||||
|
||||
// 重写错误码,使得scheduler端能等待远端schema刷新并重试
|
||||
if (is_schema_error(int_code.code_)) {
|
||||
if (GSCHEMASERVICE.is_schema_error_need_retry(NULL, MTL_ID())) {
|
||||
int_code.code_ = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
|
||||
} else {
|
||||
int_code.code_ = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
|
||||
}
|
||||
}
|
||||
|
||||
update_schema_error_code(exec_ctx, int_code.code_);
|
||||
if (OB_ISNULL(manager)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
} else if (OB_FAIL(manager->interrupt(sqc.get_qc_addr(),
|
||||
@ -161,7 +179,7 @@ int ObInterruptUtil::interrupt_qc(ObPxSqcMeta &sqc, int code)
|
||||
|
||||
|
||||
// Task 向 QC 发送中断
|
||||
int ObInterruptUtil::interrupt_qc(ObPxTask &task, int code)
|
||||
int ObInterruptUtil::interrupt_qc(ObPxTask &task, int code, ObExecContext *exec_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
ObInterruptCode int_code(code,
|
||||
@ -172,15 +190,7 @@ int ObInterruptUtil::interrupt_qc(ObPxTask &task, int code)
|
||||
ObGlobalInterruptManager *manager = ObGlobalInterruptManager::getInstance();
|
||||
ObInterruptibleTaskID interrupt_id = task.get_interrupt_id().query_interrupt_id_;
|
||||
|
||||
// 重写错误码,使得scheduler端能等待远端schema刷新并重试
|
||||
if (is_schema_error(int_code.code_)) {
|
||||
if (GSCHEMASERVICE.is_schema_error_need_retry(NULL, MTL_ID())) {
|
||||
int_code.code_ = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
|
||||
} else {
|
||||
int_code.code_ = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
|
||||
}
|
||||
}
|
||||
|
||||
update_schema_error_code(exec_ctx, int_code.code_);
|
||||
if (OB_ISNULL(manager)) {
|
||||
ret = OB_ALLOCATE_MEMORY_FAILED;
|
||||
} else if (OB_FAIL(manager->interrupt(task.get_qc_addr(),
|
||||
|
@ -71,9 +71,10 @@ public:
|
||||
static int interrupt_tasks(ObPxSqcMeta &sqc, int code);
|
||||
// DFO 重试时,需要使用新的中断号,避免遇到中断残余,被误中断
|
||||
static int regenerate_interrupt_id(ObDfo &dfo);
|
||||
static void update_schema_error_code(ObExecContext *exec_ctx, int &code);
|
||||
// SQC 以及 Tasks 向 QC 发送中断
|
||||
static int interrupt_qc(ObPxSqcMeta &sqc, int code);
|
||||
static int interrupt_qc(ObPxTask &task, int code);
|
||||
static int interrupt_qc(ObPxSqcMeta &sqc, int code, ObExecContext *exec_ctx);
|
||||
static int interrupt_qc(ObPxTask &task, int code, ObExecContext *exec_ctx);
|
||||
// 将server_id、execution_id、qc_id共同组成中断id
|
||||
static int generate_query_interrupt_id(const uint32_t server_id,
|
||||
const uint64_t px_sequence_id,
|
||||
|
@ -118,13 +118,8 @@ int ObInitSqcP::process()
|
||||
}
|
||||
|
||||
//
|
||||
if (OB_SUCCESS != ret && is_schema_error(ret)) {
|
||||
if (OB_NOT_NULL(sqc_handler)
|
||||
&& GSCHEMASERVICE.is_schema_error_need_retry(NULL, sqc_handler->get_tenant_id())) {
|
||||
ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
|
||||
} else {
|
||||
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
|
||||
}
|
||||
if (OB_SUCCESS != ret && is_schema_error(ret) && OB_NOT_NULL(sqc_handler)) {
|
||||
ObInterruptUtil::update_schema_error_code(&(sqc_handler->get_exec_ctx()), ret);
|
||||
}
|
||||
// 非rpc框架的错误内容设置到response消息中
|
||||
// rpc框架的错误码在process中返回OB_SUCCESS
|
||||
@ -423,13 +418,8 @@ int ObInitFastSqcP::process()
|
||||
}
|
||||
|
||||
//
|
||||
if (OB_SUCCESS != ret && is_schema_error(ret)) {
|
||||
if (OB_NOT_NULL(sqc_handler)
|
||||
&& GSCHEMASERVICE.is_schema_error_need_retry(NULL, sqc_handler->get_tenant_id())) {
|
||||
ret = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
|
||||
} else {
|
||||
ret = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
|
||||
}
|
||||
if (OB_SUCCESS != ret && is_schema_error(ret) && OB_NOT_NULL(sqc_handler)) {
|
||||
ObInterruptUtil::update_schema_error_code(&(sqc_handler->get_exec_ctx()), ret);
|
||||
}
|
||||
|
||||
ObActiveSessionGuard::get_stat().in_sql_execution_ = false;
|
||||
|
@ -491,12 +491,7 @@ int ObPxSQCProxy::report(int end_ret) const
|
||||
finish_msg.rc_ = sqc_ret;
|
||||
// 重写错误码,使得scheduler端能等待远端schema刷新并重试
|
||||
if (OB_SUCCESS != sqc_ret && is_schema_error(sqc_ret)) {
|
||||
if (OB_NOT_NULL(session)
|
||||
&& GSCHEMASERVICE.is_schema_error_need_retry(NULL, session->get_effective_tenant_id())) {
|
||||
finish_msg.rc_ = OB_ERR_REMOTE_SCHEMA_NOT_FULL;
|
||||
} else {
|
||||
finish_msg.rc_ = OB_ERR_WAIT_REMOTE_SCHEMA_REFRESH;
|
||||
}
|
||||
ObInterruptUtil::update_schema_error_code(sqc_arg.exec_ctx_, finish_msg.rc_);
|
||||
}
|
||||
|
||||
// 如果 session 为 null,rc 不会为 SUCCESS,没有设置 trans_result 也无妨
|
||||
|
@ -108,7 +108,7 @@ int ObPxSubCoord::pre_process()
|
||||
if (IS_INTERRUPTED()) {
|
||||
// 当前是被QC中断的,不再向QC发送中断
|
||||
} else {
|
||||
(void) ObInterruptUtil::interrupt_qc(sqc_arg_.sqc_, ret);
|
||||
(void) ObInterruptUtil::interrupt_qc(sqc_arg_.sqc_, ret, sqc_arg_.exec_ctx_);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -482,7 +482,7 @@ int ObPxTaskProcess::do_process()
|
||||
&& ObVirtualTableErrorWhitelist::should_ignore_vtable_error(ret)) {
|
||||
// 忽略虚拟表错误
|
||||
} else {
|
||||
(void) ObInterruptUtil::interrupt_qc(arg_.task_, ret);
|
||||
(void) ObInterruptUtil::interrupt_qc(arg_.task_, ret, arg_.exec_ctx_);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -201,7 +201,7 @@ void PxWorkerFunctor::operator ()()
|
||||
if (OB_FAIL(runtime_arg.init_deserialize_param(mem_context, *env_arg_.get_gctx()))) {
|
||||
LOG_WARN("fail to init args", K(ret));
|
||||
} else if (OB_FAIL(runtime_arg.deep_copy_assign(task_arg_, mem_context->get_arena_allocator()))) {
|
||||
(void) ObInterruptUtil::interrupt_qc(task_arg_.task_, ret);
|
||||
(void) ObInterruptUtil::interrupt_qc(task_arg_.task_, ret, task_arg_.exec_ctx_);
|
||||
LOG_WARN("fail deep copy assign arg", K(task_arg_), K(ret));
|
||||
} else {
|
||||
// 绑定sqc_handler,方便算子任何地方都可以拿sqc_handle
|
||||
|
Loading…
x
Reference in New Issue
Block a user