[to #41932470] fix memory of udf

This commit is contained in:
obdev
2023-02-06 23:45:36 +08:00
committed by ob-robot
parent ec4fd2244f
commit 9b302b8660
11 changed files with 96 additions and 99 deletions

View File

@ -35,7 +35,7 @@ STATIC_ASSERT(sizeof(ObPrecision) == sizeof(ObLengthSemantics),
OB_SERIALIZE_MEMBER(ObDatumMeta, type_, cs_type_, scale_, precision_);
ObEvalCtx::ObEvalCtx(ObExecContext &exec_ctx)
ObEvalCtx::ObEvalCtx(ObExecContext &exec_ctx, ObIAllocator *allocator)
: frames_(exec_ctx.get_frames()),
max_batch_size_(0),
exec_ctx_(exec_ctx),
@ -44,7 +44,7 @@ ObEvalCtx::ObEvalCtx(ObExecContext &exec_ctx)
tmp_alloc_used_(exec_ctx.get_tmp_alloc_used()),
batch_idx_(0),
batch_size_(0),
expr_res_alloc_(exec_ctx.get_eval_res_allocator())
expr_res_alloc_((dynamic_cast<ObArenaAllocator*>(allocator) != NULL) ? (*(dynamic_cast<ObArenaAllocator*>(allocator))) : exec_ctx.get_eval_res_allocator())
{
}

View File

@ -192,7 +192,7 @@ struct ObEvalCtx
int64_t batch_idx_default_val_ = 0;
int64_t batch_size_default_val_ = 0;
};
explicit ObEvalCtx(ObExecContext &exec_ctx);
explicit ObEvalCtx(ObExecContext &exec_ctx, ObIAllocator *allocator = NULL);
explicit ObEvalCtx(ObEvalCtx &eval_ctx);
virtual ~ObEvalCtx();

View File

@ -209,7 +209,7 @@ int ObExprFrameInfo::assign(const ObExprFrameInfo &other,
return ret;
}
int ObExprFrameInfo::pre_alloc_exec_memory(ObExecContext &exec_ctx) const
int ObExprFrameInfo::pre_alloc_exec_memory(ObExecContext &exec_ctx, ObIAllocator *allocator) const
{
int ret = OB_SUCCESS;
uint64_t frame_cnt = 0;
@ -218,7 +218,7 @@ int ObExprFrameInfo::pre_alloc_exec_memory(ObExecContext &exec_ctx) const
if (NULL == (phy_ctx = exec_ctx.get_physical_plan_ctx())) {
ret = OB_INVALID_ARGUMENT;
LOG_WARN("invalid argument", K(phy_ctx), K(ret));
} else if (OB_FAIL(alloc_frame(exec_ctx.get_allocator(),
} else if (OB_FAIL(alloc_frame(allocator != NULL ? *allocator : exec_ctx.get_allocator(),
phy_ctx->get_param_frame_ptrs(),
frame_cnt,
frames))) {

View File

@ -96,7 +96,7 @@ public:
// 预分配执行过程中需要的内存, 包括frame memory和expr_ctx memory
// @param exec_ctx 执行期context,初始化其成员:frames_, frame_cnt_, expr_ctx_arr_
int pre_alloc_exec_memory(ObExecContext &exec_ctx) const;
int pre_alloc_exec_memory(ObExecContext &exec_ctx, ObIAllocator *allocator = NULL) const;
// 分配frame内存, 并将所有frame指针按每个frame idx的序存放到frames数组中
// @param [in] exec_allocator 执行期分配期

View File

@ -414,66 +414,50 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
ParamStore *udf_params = nullptr;
uint64_t udf_ctx_id = static_cast<uint64_t>(expr.expr_ctx_id_);
ObExprUDFCtx *udf_ctx = nullptr;
share::schema::ObSchemaGetterGuard schema_guard;
ObSQLSessionInfo *session = nullptr;
ObIAllocator &alloc = ctx.exec_ctx_.get_allocator();
const ObExprUDFInfo *info = static_cast<ObExprUDFInfo *>(expr.extra_info_);
ObObj *objs = nullptr;
CK(0 == expr.arg_cnt_ || OB_NOT_NULL(objs = static_cast<ObObj *> (alloc.alloc(expr.arg_cnt_ * sizeof(ObObj)))));
CK(OB_NOT_NULL(info));
bool is_stack_overflow = false;
OZ(check_stack_overflow(is_stack_overflow));
if (OB_SUCC(ret) && is_stack_overflow) {
ret = OB_SIZE_OVERFLOW;
LOG_WARN("too deep recursive",
K(ret), K(is_stack_overflow), K(info->udf_package_id_), K(info->udf_id_));
}
if (OB_SUCC(ret) && expr.arg_cnt_ != info->params_desc_.count()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("udf parameter number is not equel to params desc count",
K(ret), K(expr.arg_cnt_), K(info->params_desc_.count()), K(info->params_desc_));
}
CK(OB_NOT_NULL(session = ctx.exec_ctx_.get_my_session()));
CK(OB_NOT_NULL(pl_engine = session->get_pl_engine()));
if (OB_FAIL(ret)) {
} else if (OB_FAIL(expr.eval_param_value(ctx))) {
LOG_WARN("failed o eval param value", K(ret));
} else if (OB_FAIL(build_udf_ctx(udf_ctx_id, expr.arg_cnt_, ctx.exec_ctx_, udf_ctx))) {
LOG_WARN("failed to build_udf_ctx", K(ret), K(udf_ctx_id));
} else if (OB_ISNULL(udf_params = udf_ctx->get_param_store())) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to allocate memory", K(ret));
}
CK (OB_NOT_NULL(info));
OZ (check_types(expr, *info));
OZ (fill_obj_stack(expr, ctx, objs));
OZ (process_in_params(
objs, expr.arg_cnt_, info->params_desc_, info->params_type_, *udf_params, alloc));
// replace first param when is udt constructor,
/* for example:
* a := demo(3,4) will be rewirte to a :=demo(null, 3, 4) after resolve
* here will have to change to a := demo(self, 3, 4), which self is a object type
* who's type id is udf_package_id_
*/
CK (OB_NOT_NULL(session = ctx.exec_ctx_.get_my_session()));
CK (OB_NOT_NULL(pl_engine = session->get_pl_engine()));
OZ (expr.eval_param_value(ctx));
OZ (build_udf_ctx(udf_ctx_id, expr.arg_cnt_, ctx.exec_ctx_, udf_ctx));
CK (OB_NOT_NULL(udf_params = udf_ctx->get_param_store()));
if (OB_FAIL(ret)) {
// do nothing ...
} else if (!lib::is_oracle_mode() && info->is_udt_cons_) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("unexpected, udt constructor must be oracle mode", K(ret));
} else {
bool need_end_stmt = false;
//由于这是临时处理,用中文注释解释,便于理解
//根据和Oracle的兼容性,我们认为select pl_udf() from dual也是一种嵌套语句
//这里以前认为这种情况不属于嵌套语句,导致跟Oracle的行为存在不兼容
//由于begin_nested_session等相关的旧接口还没有去掉
//因此在这里特殊处理一下,对这种情况下,在UDF执行前调用session->set_start_stmt()
//目的是骗过nested session的一些接口的参数检查,等后续完全去掉nested session相关的接口
//这里的特殊处理也会去掉
stmt::StmtType parent_stmt = ctx.exec_ctx_.get_sql_ctx()->stmt_type_;
if (!session->has_start_stmt() && stmt::StmtType::T_SELECT == parent_stmt) {
need_end_stmt = true;
session->set_start_stmt();
}
if (info->is_udt_cons_) {
pl::ObPLCtxGuard guard(ctx.exec_ctx_.get_pl_ctx(), ret);
ObEvalCtx::TempAllocGuard memory_guard(ctx);
ObArenaAllocator &allocator = memory_guard.get_allocator();
ObObj *objs = nullptr;
if (expr.arg_cnt_ > 0) {
CK (OB_NOT_NULL(objs = static_cast<ObObj *> (allocator.alloc(expr.arg_cnt_ * sizeof(ObObj)))));
OZ (fill_obj_stack(expr, ctx, objs));
OZ (process_in_params(
objs, expr.arg_cnt_, info->params_desc_, info->params_type_, *udf_params, alloc));
}
if (OB_SUCC(ret) && info->is_udt_cons_) {
pl::ObPLUDTNS ns(*ctx.exec_ctx_.get_sql_ctx()->schema_guard_);
pl::ObPLDataType pl_type;
pl_type.set_user_type_id(pl::PL_RECORD_TYPE, info->udf_package_id_);
@ -481,8 +465,7 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
CK (0 < udf_params->count());
OZ (ns.init_complex_obj(alloc, pl_type, udf_params->at(0), false, false));
}
pl::ObPLCtxGuard guard(ctx.exec_ctx_.get_pl_ctx(), ret);
ObArenaAllocator allocator;
try {
int64_t package_id = info->is_udt_udf_ ?
share::schema::ObUDTObjectType::mask_object_id(info->udf_package_id_)
@ -508,7 +491,6 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
tmp_result,
package_id);
} catch(...) {
// OZ(after_calc_result(schema_guard, sql_ctx, ctx.exec_ctx_));
throw;
}
if (OB_FAIL(ret)) {
@ -522,7 +504,7 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
OX (ctx.exec_ctx_.get_pl_ctx()->reset_obj());
OZ (ctx.exec_ctx_.get_pl_ctx()->add(result));
} else {
OZ (deep_copy_obj(alloc, tmp_result, result));
result = tmp_result;
OX (ctx.exec_ctx_.get_pl_ctx()->reset_obj());
}
} else {
@ -535,29 +517,28 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
OX (obj_self->set_is_null(false));
}
}
//兼容oracle udf调用,如果udf是在sql(或pl的sql)中调用,则将错误码OB_READ_NOTHING覆盖为OB_SUCCESS
//函数返回结果为null,否则不对错误码进行覆盖
if (OB_READ_NOTHING == ret && info->is_called_in_sql_ && lib::is_oracle_mode()) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret) && info->is_called_in_sql_ && lib::is_oracle_mode()) {
// UDF 和系统包函数里支持出现最大长度为 32767 byte 的 raw 类型变量。
// 但如果 UDF 和系统包函数被 SQL 调用且返回值类型是 raw,则长度不允许超过 2000 byte。
if (result.is_raw() && result.get_raw().length() > OB_MAX_ORACLE_RAW_SQL_COL_LENGTH) {
ret = OB_ERR_NUMERIC_OR_VALUE_ERROR;
ObString err_msg("raw variable length too long");
LOG_WARN("raw variable length too long", K(ret), K(result.get_raw().length()));
LOG_USER_ERROR(OB_ERR_NUMERIC_OR_VALUE_ERROR, err_msg.length(), err_msg.ptr());
}
if (OB_SUCC(ret)
&& info->is_called_in_sql_
&& result.is_raw() && result.get_raw().length() > OB_MAX_ORACLE_RAW_SQL_COL_LENGTH
&& lib::is_oracle_mode()) {
ret = OB_ERR_NUMERIC_OR_VALUE_ERROR;
ObString err_msg("raw variable length too long");
LOG_WARN("raw variable length too long", K(ret), K(result.get_raw().length()));
LOG_USER_ERROR(OB_ERR_NUMERIC_OR_VALUE_ERROR, err_msg.length(), err_msg.ptr());
}
OZ (process_out_params(objs, expr.arg_cnt_,
*udf_params, alloc,
ctx.exec_ctx_, info->nocopy_params_,
info->params_desc_, info->params_type_));
OZ (process_out_params(objs,
expr.arg_cnt_,
*udf_params,
alloc,
ctx.exec_ctx_,
info->nocopy_params_,
info->params_desc_,
info->params_type_));
if (OB_SUCC(ret)) {
//TODO:@peihan.dph
//for lob locator type, pl engine return longtext type which is mismatch with datum type
//we will solve this mismatch later
if (!result.is_null()
&& result.get_type() != expr.datum_meta_.type_
&& ObLobType == expr.datum_meta_.type_) {