[CP] [to #54239515] fix udf out parameter with same source

This commit is contained in:
obdev
2024-02-09 00:37:17 +00:00
committed by ob-robot
parent 83e7eee5ca
commit b659c82a16
2 changed files with 238 additions and 79 deletions

View File

@ -277,9 +277,10 @@ int ObExprUDF::process_out_params(const ObObj *objs_stack,
CK (iparams.count() == params_desc.count());
CK (0 == nocopy_params.count() || nocopy_params.count() == iparams.count());
// 先处理NoCopy参数
ObSEArray<bool, 16> dones;
for (int64_t i = 0; OB_SUCC(ret) && i < iparams.count(); ++i) {
if (!params_desc.at(i).is_out()) {
// do nothing ...
OZ (dones.push_back(true));
} else if (params_desc.at(i).is_local_out() && nocopy_params.at(i) != OB_INVALID_INDEX) {
const ParamStore &param_store = exec_ctx.get_physical_plan_ctx()->get_param_store();
int64_t position = params_desc.at(i).get_index();
@ -294,93 +295,228 @@ int ObExprUDF::process_out_params(const ObObj *objs_stack,
result));
OX (result.copy_value_or_obj(*modify, true));
OX (modify->set_param_meta());
OZ (dones.push_back(true));
} else {
OZ (dones.push_back(false));
}
}
for (int64_t i = 0; OB_SUCC(ret) && i < iparams.count(); ++i) {
if (!params_desc.at(i).is_out()) {
// do nothing ...
} else if (params_desc.at(i).is_local_out()) { //out param in paramstore of caller
if (nocopy_params.count() > 0 && nocopy_params.at(i) != OB_INVALID_INDEX) {
// nocopy parameter already process before, do nothing ....
} else {
const ParamStore &param_store = exec_ctx.get_physical_plan_ctx()->get_param_store();
int64_t position = params_desc.at(i).get_index();
ObObjParam *modify = NULL;
ObObjParam result;
CK (position < param_store.count());
CK (OB_NOT_NULL(modify = const_cast<ObObjParam*>(&(param_store.at(position)))));
// ext type cannot convert. just copy it.
if (iparams.at(i).is_ext()) {
// caller param may ref cursor, which may not allocated.
if (modify->is_null()) {
OX (iparams.at(i).copy_value_or_obj(*modify, true));
if (iparams.at(i).is_ref_cursor_type()) {
modify->set_is_ref_cursor_type(true);
}
OX (modify->set_param_meta());
} else if (!modify->is_ext()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("process function out param failed, type mismatch", K(ret),
K(iparams.at(i)), K(*modify));
} else {
OX (iparams.at(i).copy_value_or_obj(*modify, true));
OX (modify->set_param_meta());
OZ (process_singal_out_param(i,
dones,
objs_stack,
param_num,
iparams,
alloc,
exec_ctx,
nocopy_params,
params_desc,
params_type));
}
return ret;
}
int ObExprUDF::process_singal_out_param(int64_t i,
ObIArray<bool> &dones,
const ObObj *objs_stack,
int64_t param_num,
ParamStore& iparams,
ObIAllocator &alloc,
ObExecContext &exec_ctx,
const ObIArray<int64_t> &nocopy_params,
const ObIArray<ObUDFParamDesc> &params_desc,
const ObIArray<ObExprResType> &params_type)
{
int ret = OB_SUCCESS;
if (dones.at(i)) {
// already process, do nothing
} else if (params_desc.at(i).is_local_out()) { //out param in paramstore of caller
if (nocopy_params.count() > 0 && nocopy_params.at(i) != OB_INVALID_INDEX) {
// nocopy parameter already process before, do nothing ....
} else {
const ParamStore &param_store = exec_ctx.get_physical_plan_ctx()->get_param_store();
int64_t position = params_desc.at(i).get_index();
ObObjParam *modify = NULL;
ObObjParam result;
CK (position < param_store.count());
CK (OB_NOT_NULL(modify = const_cast<ObObjParam*>(&(param_store.at(position)))));
// ext type cannot convert. just copy it.
if (iparams.at(i).is_ext()) {
// caller param may ref cursor, which may not allocated.
if (modify->is_null()) {
OX (iparams.at(i).copy_value_or_obj(*modify, true));
if (iparams.at(i).is_ref_cursor_type()) {
modify->set_is_ref_cursor_type(true);
}
OX (modify->set_param_meta());
} else if (!modify->is_ext()) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("process function out param failed, type mismatch", K(ret),
K(iparams.at(i)), K(*modify));
} else {
OZ (sql::ObSPIService::spi_convert(exec_ctx.get_my_session(),
&alloc,
iparams.at(i),
params_type.at(i),
result));
OX (result.copy_value_or_obj(*modify, true));
OX (iparams.at(i).copy_value_or_obj(*modify, true));
OX (modify->set_param_meta());
}
} else {
OZ (sql::ObSPIService::spi_convert(exec_ctx.get_my_session(),
&alloc,
iparams.at(i),
params_type.at(i),
result));
OX (result.copy_value_or_obj(*modify, true));
OX (modify->set_param_meta());
}
} else if (params_desc.at(i).is_package_var_out()) {
OZ (ObSPIService::spi_set_package_variable(
&exec_ctx,
NULL,
params_desc.at(i).get_package_id(),
params_desc.at(i).get_index(),
iparams.at(i)));
} else if (params_desc.at(i).is_subprogram_var_out()) {
OZ (pl::ObPLContext::set_subprogram_var_from_local(
*exec_ctx.get_my_session(),
params_desc.at(i).get_package_id(),
params_desc.at(i).get_subprogram_id(),
params_desc.at(i).get_index(),
iparams.at(i)));
} else if (!params_type.at(i).is_ext()) {
void *ptr = NULL;
ObObj *obj = NULL;
ObObjParam result;
CK (objs_stack[i].is_ext());
OX (ptr = reinterpret_cast<void*>(objs_stack[i].get_ext()));
CK (OB_NOT_NULL(ptr));
OX (obj = reinterpret_cast<ObObj*>(ptr));
CK (OB_NOT_NULL(obj));
OZ (sql::ObSPIService::spi_convert(exec_ctx.get_my_session(),
&alloc, iparams.at(i),
params_type.at(i),
result));
OX (result.copy_value_or_obj(*obj, true));
OX (result.set_param_meta());
} else if (params_desc.at(i).is_obj_access_out() &&
OB_INVALID_ID != params_desc.at(i).get_package_id() &&
OB_INVALID_ID != params_desc.at(i).get_index()) {
ObIAllocator *pkg_allocator = NULL;
pl::ObPLExecCtx plctx(nullptr, &exec_ctx, nullptr,nullptr,nullptr,nullptr);
ObObj &obj = iparams.at(i);
OZ (ObSPIService::spi_get_package_allocator(&plctx, params_desc.at(i).get_package_id(), pkg_allocator));
if (OB_SUCC(ret) && nullptr != pkg_allocator) {
if (obj.is_ext() && obj.get_meta().get_extend_type() != pl::PL_REF_CURSOR_TYPE) {
OZ (pl::ObUserDefinedType::deep_copy_obj(*pkg_allocator, obj, obj, true));
} else {
OZ (deep_copy_obj(*pkg_allocator, obj, obj));
OX (dones.at(i) = true);
}
} else if (params_desc.at(i).is_package_var_out()) {
OZ (ObSPIService::spi_set_package_variable(
&exec_ctx,
NULL,
params_desc.at(i).get_package_id(),
params_desc.at(i).get_index(),
iparams.at(i)));
OX (dones.at(i) = true);
} else if (params_desc.at(i).is_subprogram_var_out()) {
OZ (pl::ObPLContext::set_subprogram_var_from_local(
*exec_ctx.get_my_session(),
params_desc.at(i).get_package_id(),
params_desc.at(i).get_subprogram_id(),
params_desc.at(i).get_index(),
iparams.at(i)));
OX (dones.at(i) = true);
} else if (params_desc.at(i).is_obj_access_out() &&
OB_INVALID_ID != params_desc.at(i).get_package_id() &&
OB_INVALID_ID != params_desc.at(i).get_index()) {
OZ (SMART_CALL(process_package_out_param(
i, dones, objs_stack, param_num, iparams, alloc, exec_ctx, nocopy_params, params_desc, params_type)));
} else if (!params_type.at(i).is_ext()) {
void *ptr = NULL;
ObObj *obj = NULL;
ObObjParam result;
CK (objs_stack[i].is_ext());
OX (ptr = reinterpret_cast<void*>(objs_stack[i].get_ext()));
CK (OB_NOT_NULL(ptr));
OX (obj = reinterpret_cast<ObObj*>(ptr));
CK (OB_NOT_NULL(obj));
OZ (sql::ObSPIService::spi_convert(exec_ctx.get_my_session(),
&alloc, iparams.at(i),
params_type.at(i),
result));
OX (result.copy_value_or_obj(*obj, true));
OX (result.set_param_meta());
OX (dones.at(i) = true);
}
return ret;
}
int ObExprUDF::process_package_out_param(int64_t idx,
ObIArray<bool> &dones,
const ObObj *objs_stack,
int64_t param_num,
ParamStore& iparams,
ObIAllocator &alloc,
ObExecContext &exec_ctx,
const ObIArray<int64_t> &nocopy_params,
const ObIArray<ObUDFParamDesc> &params_desc,
const ObIArray<ObExprResType> &params_type)
{
int ret = OB_SUCCESS;
// check if left out parameter is child of current
for (int64_t i = idx + 1; OB_SUCC(ret) && i < iparams.count(); ++i) {
if (!dones.at(i) && iparams.at(i).is_ext()) {
bool is_child = false;
OZ (is_child_of(iparams.at(idx), iparams.at(i), is_child));
if (OB_SUCC(ret) && is_child) {
OZ (SMART_CALL(process_singal_out_param(
i, dones, objs_stack, param_num, iparams, alloc, exec_ctx, nocopy_params, params_desc, params_type)));
}
}
}
ObIAllocator *pkg_allocator = NULL;
pl::ObPLExecCtx plctx(nullptr, &exec_ctx, nullptr,nullptr,nullptr,nullptr);
OZ (ObSPIService::spi_get_package_allocator(&plctx, params_desc.at(idx).get_package_id(), pkg_allocator));
if (OB_FAIL(ret)) {
} else if (!params_type.at(idx).is_ext()) {
void *ptr = NULL;
ObObj *obj = NULL;
ObObjParam result;
CK (objs_stack[idx].is_ext());
OX (ptr = reinterpret_cast<void*>(objs_stack[idx].get_ext()));
CK (OB_NOT_NULL(ptr));
OX (obj = reinterpret_cast<ObObj*>(ptr));
CK (OB_NOT_NULL(obj));
OZ (sql::ObSPIService::spi_convert(exec_ctx.get_my_session(),
pkg_allocator != NULL ? pkg_allocator : &alloc,
iparams.at(idx),
params_type.at(idx),
result));
OX (result.copy_value_or_obj(*obj, true));
OX (result.set_param_meta());
} else {
ObObj &obj = iparams.at(idx);
if (OB_SUCC(ret) && nullptr != pkg_allocator) {
if (obj.is_ext() && obj.get_meta().get_extend_type() != pl::PL_REF_CURSOR_TYPE) {
OZ (pl::ObUserDefinedType::deep_copy_obj(*pkg_allocator, obj, obj, true));
} else {
OZ (deep_copy_obj(*pkg_allocator, obj, obj));
}
}
if (OB_FAIL(ret)) {
int tmp = pl::ObUserDefinedType::destruct_obj(obj, exec_ctx.get_my_session());
if (OB_SUCCESS != tmp) {
LOG_WARN("fail to destruct param of udf", K(ret), K(tmp));
}
}
}
OZ (ObSPIService::spi_update_package_change_info(
&plctx, params_desc.at(idx).get_package_id(), params_desc.at(idx).get_index()));
OX (dones.at(idx) = true);
return ret;
}
int ObExprUDF::is_child_of(ObObj &parent, ObObj &child, bool &is_child)
{
int ret = OB_SUCCESS;
if (parent.is_ext() && child.is_ext() && parent.get_ext() == child.get_ext()) {
is_child = true;
} else if (parent.is_pl_extend() && parent.get_ext() != 0) {
switch (parent.get_meta().get_extend_type()) {
case pl::PL_NESTED_TABLE_TYPE:
case pl::PL_ASSOCIATIVE_ARRAY_TYPE:
case pl::PL_VARRAY_TYPE: {
pl::ObPLCollection* coll = reinterpret_cast<pl::ObPLCollection*>(parent.get_ext());
CK (OB_NOT_NULL(coll));
CK (coll->get_data());
for (int64_t i = 0; OB_SUCC(ret) && i < coll->get_count(); ++i) {
if (!(coll->get_data()[i]).is_ext()) {
ObObj tmp;
tmp.set_ext(reinterpret_cast<int64_t>(&(coll->get_data()[i])));
OZ (SMART_CALL(is_child_of(tmp, child, is_child)));
} else {
OZ (SMART_CALL(is_child_of(coll->get_data()[i], child, is_child)));
}
}
}
OZ (ObSPIService::spi_update_package_change_info(&plctx, params_desc.at(i).get_package_id(),
params_desc.at(i).get_index()));
} break;
case pl::PL_RECORD_TYPE: {
pl::ObPLRecord* record = reinterpret_cast<pl::ObPLRecord*>(parent.get_ext());
CK (OB_NOT_NULL(record));
for (int64_t i = 0; OB_SUCC(ret) && i < record->get_count(); ++i) {
ObObj *obj = NULL;
OZ (record->get_element(i, obj));
CK (OB_NOT_NULL(obj));
if (OB_FAIL(ret)) {
} else if (!obj->is_ext()) {
ObObj tmp;
tmp.set_ext(reinterpret_cast<int64_t>(obj));
OZ (SMART_CALL(is_child_of(tmp, child, is_child)));
} else {
OZ (SMART_CALL(is_child_of(*obj, child, is_child)));
}
}
} break;
default: {
} break;
}
}
return ret;

View File

@ -158,6 +158,29 @@ public:
const common::ObIArray<int64_t> &nocopy_params,
const common::ObIArray<ObUDFParamDesc> &params_desc,
const common::ObIArray<ObExprResType> &params_type);
static int is_child_of(ObObj &parent, ObObj &child, bool &is_child);
static int process_singal_out_param(int64_t i,
ObIArray<bool> &dones,
const ObObj *objs_stack,
int64_t param_num,
ParamStore& iparams,
ObIAllocator &alloc,
ObExecContext &exec_ctx,
const ObIArray<int64_t> &nocopy_params,
const ObIArray<ObUDFParamDesc> &params_desc,
const ObIArray<ObExprResType> &params_type);
static int process_package_out_param(int64_t idx,
ObIArray<bool> &dones,
const ObObj *objs_stack,
int64_t param_num,
ParamStore& iparams,
ObIAllocator &alloc,
ObExecContext &exec_ctx,
const ObIArray<int64_t> &nocopy_params,
const ObIArray<ObUDFParamDesc> &params_desc,
const ObIArray<ObExprResType> &params_type);
static int before_calc_result(share::schema::ObSchemaGetterGuard &schema_guard,
ObSqlCtx &sql_ctx,
ObExecContext &exec_ctx);