[CP] [to #53947201] fix a -4016 problem in PX + UDF
This commit is contained in:
@ -386,6 +386,49 @@ int ObExprUDF::process_out_params(const ObObj *objs_stack,
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExprUDF::before_calc_result(share::schema::ObSchemaGetterGuard &schema_guard,
|
||||
ObSqlCtx &sql_ctx,
|
||||
ObExecContext &exec_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
// 通过SPI执行的UDF可能会存在exec_ctx_中的schema_guard是空的情况
|
||||
// 这里根据task_ctx中记录的schema_version重新获取schema_guard
|
||||
if (OB_ISNULL(exec_ctx.get_sql_ctx())
|
||||
|| OB_ISNULL(exec_ctx.get_sql_ctx()->schema_guard_)) {
|
||||
sql::ObTaskExecutorCtx &task_ctx = exec_ctx.get_task_exec_ctx();
|
||||
const observer::ObGlobalContext &gctx = observer::ObServer::get_instance().get_gctx();
|
||||
if (OB_FAIL(gctx.schema_service_->get_tenant_schema_guard(
|
||||
exec_ctx.get_my_session()->get_effective_tenant_id(),
|
||||
schema_guard,
|
||||
task_ctx.get_query_tenant_begin_schema_version(),
|
||||
task_ctx.get_query_sys_begin_schema_version()))) {
|
||||
LOG_WARN("get schema guard failed", K(ret));
|
||||
}
|
||||
}
|
||||
// 通过分布式计划执行的function没有sqlctx信息, 构造一个
|
||||
if (OB_ISNULL(exec_ctx.get_sql_ctx())) {
|
||||
sql_ctx.session_info_ = exec_ctx.get_my_session();
|
||||
sql_ctx.schema_guard_ = &schema_guard;
|
||||
exec_ctx.set_sql_ctx(&sql_ctx);
|
||||
} else if (OB_ISNULL(exec_ctx.get_sql_ctx()->schema_guard_)) {
|
||||
exec_ctx.get_sql_ctx()->schema_guard_ = &schema_guard;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExprUDF::after_calc_result(share::schema::ObSchemaGetterGuard &schema_guard,
|
||||
ObSqlCtx &sql_ctx,
|
||||
ObExecContext &exec_ctx)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
if (exec_ctx.get_sql_ctx() == &sql_ctx) {
|
||||
exec_ctx.set_sql_ctx(NULL);
|
||||
} else if (exec_ctx.get_sql_ctx()->schema_guard_ == &schema_guard) {
|
||||
exec_ctx.get_sql_ctx()->schema_guard_ = NULL;
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
int ObExprUDF::cg_expr(ObExprCGCtx &expr_cg_ctx, const ObRawExpr &raw_expr, ObExpr &rt_expr) const
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
@ -498,6 +541,17 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
|
||||
objs, expr.arg_cnt_, info->params_desc_, info->params_type_, *udf_params, alloc, &deep_in_objs));
|
||||
}
|
||||
|
||||
share::schema::ObSchemaGetterGuard schema_guard;
|
||||
ObSqlCtx sql_ctx;
|
||||
OZ (before_calc_result(schema_guard, sql_ctx, ctx.exec_ctx_));
|
||||
|
||||
// restore ctx.exec_ctx_ only when ctx.exec_ctx_ is successfully changed
|
||||
NAMED_DEFER(need_restore_exec_ctx,
|
||||
after_calc_result(schema_guard, sql_ctx, ctx.exec_ctx_));
|
||||
if (OB_FAIL(ret)) {
|
||||
need_restore_exec_ctx.deactivate();
|
||||
}
|
||||
|
||||
if (OB_SUCC(ret) && info->is_udt_cons_) {
|
||||
pl::ObPLUDTNS ns(*ctx.exec_ctx_.get_sql_ctx()->schema_guard_);
|
||||
pl::ObPLDataType pl_type;
|
||||
|
@ -158,6 +158,11 @@ public:
|
||||
const common::ObIArray<int64_t> &nocopy_params,
|
||||
const common::ObIArray<ObUDFParamDesc> ¶ms_desc,
|
||||
const common::ObIArray<ObExprResType> ¶ms_type);
|
||||
static int before_calc_result(share::schema::ObSchemaGetterGuard &schema_guard,
|
||||
ObSqlCtx &sql_ctx,
|
||||
ObExecContext &exec_ctx);
|
||||
static int after_calc_result(share::schema::ObSchemaGetterGuard &schema_guard,
|
||||
ObSqlCtx &sql_ctx, ObExecContext &exec_ctx);
|
||||
static bool need_deep_copy_in_parameter(
|
||||
const common::ObObj *objs_stack,
|
||||
int64_t param_num,
|
||||
|
Reference in New Issue
Block a user