[CP] [FEAT MERGE]support dblink udf

This commit is contained in:
seuwebber
2024-03-21 04:45:36 +00:00
committed by ob-robot
parent 6d7fab4ff1
commit 5bdd7e5bd1
61 changed files with 1065 additions and 401 deletions

View File

@ -218,7 +218,8 @@ int ObCallProcedureExecutor::execute(ObExecContext &ctx, ObCallProcedureStmt &st
ctx.get_allocator(),
NULL,
stmt.get_dblink_routine_info(),
params))) {
params,
NULL))) {
LOG_WARN("failed to execute dblink pl", K(ret), KP(stmt.get_dblink_routine_info()));
#endif
}

View File

@ -951,8 +951,7 @@ int ObExprCast::fill_element(const sql::ObExpr &expr,
int64_t init_size = OB_INVALID_SIZE;
if (OB_FAIL(collection_type->get_element_type().newx(*coll->get_allocator(), ns, ptr))) {
LOG_WARN("failed to new element", K(ret));
} else if (OB_FAIL(collection_type->get_element_type().get_size(*ns,
pl::PL_TYPE_INIT_SIZE,
} else if (OB_FAIL(collection_type->get_element_type().get_size(pl::PL_TYPE_INIT_SIZE,
init_size))) {
LOG_WARN("failed to get size", K(ret));
} else {

View File

@ -291,7 +291,7 @@ int ObExprCollectionConstruct::eval_collection_construct(const ObExpr &expr,
int64_t ptr = 0;
int64_t init_size = OB_INVALID_SIZE;
OZ (collection_type->get_element_type().newx(*coll->get_allocator(), ns, ptr));
OZ (collection_type->get_element_type().get_size(*ns, pl::PL_TYPE_INIT_SIZE, init_size));
OZ (collection_type->get_element_type().get_size(pl::PL_TYPE_INIT_SIZE, init_size));
OX (new_composite.set_extend(ptr, collection_type->get_element_type().get_type(), init_size));
OX (static_cast<ObObj*>(coll->get_data())[i] = new_composite);
} else if (pl::PL_OPAQUE_TYPE == v.get_meta().get_extend_type()

View File

@ -60,7 +60,9 @@ int ObExprGetPackageVar::calc(ObObj &result,
} else if (OB_ISNULL(pl_engine = session_info->get_pl_engine())) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("pl engine is null", K(ret));
} else if (OB_ISNULL(package_guard = exec_ctx->get_package_guard())) {
} else if (OB_FAIL(exec_ctx->get_package_guard(package_guard))) {
LOG_WARN("get package guard failed", K(ret));
} else if (OB_ISNULL(package_guard)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("package guard is null", K(ret));
} else if (OB_NOT_NULL(exec_ctx->get_sql_ctx())

View File

@ -307,13 +307,18 @@ int ObExprObjAccess::ExtraInfo::get_record_attr(const pl::ObObjAccessIdx &curren
CK (OB_NOT_NULL(ns));
OZ (ns->get_user_type(udt_id, user_type));
} else {
pl::ObPLResolveCtx resolve_ctx(alloc,
*ctx.exec_ctx_.get_my_session(),
*ctx.exec_ctx_.get_sql_ctx()->schema_guard_,
*ctx.exec_ctx_.get_package_guard(),
*ctx.exec_ctx_.get_sql_proxy(),
false);
OZ (resolve_ctx.get_user_type(udt_id, user_type));
pl::ObPLPackageGuard *package_guard = NULL;
OZ (ctx.exec_ctx_.get_package_guard(package_guard));
CK (OB_NOT_NULL(package_guard));
if (OB_SUCC(ret)) {
pl::ObPLResolveCtx resolve_ctx(alloc,
*ctx.exec_ctx_.get_my_session(),
*ctx.exec_ctx_.get_sql_ctx()->schema_guard_,
*package_guard,
*ctx.exec_ctx_.get_sql_proxy(),
false);
OZ (resolve_ctx.get_user_type(udt_id, user_type));
}
}
CK (OB_NOT_NULL(user_type));
CK (user_type->is_record_type());

View File

@ -144,7 +144,7 @@ int ObExprObjectConstruct::newx(ObEvalCtx &ctx, ObObj &result, uint64_t udt_id)
OZ (ns->get_user_type(udt_id, user_type, &tmp_alloc));
CK (OB_NOT_NULL(user_type));
OZ (user_type->newx(alloc, ns, ptr));
OZ (user_type->get_size(*ns, pl::PL_TYPE_INIT_SIZE, init_size));
OZ (user_type->get_size(pl::PL_TYPE_INIT_SIZE, init_size));
OX (new_composite.set_extend(ptr, user_type->get_type(), init_size));
OX (result = new_composite);
}

View File

@ -209,7 +209,7 @@ int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx,
CK (OB_NOT_NULL(collection = reinterpret_cast<pl::ObPLCollection*>(ptr)));
OX (collection->set_count(0));
}
OZ (collection_type->get_element_type().get_size(*pl_exec_ctx, pl::PL_TYPE_INIT_SIZE, init_size));
OZ (collection_type->get_element_type().get_size(pl::PL_TYPE_INIT_SIZE, init_size));
OX (row->set_extend(ptr, collection_type->get_element_type().get_type(), init_size));
}
if (OB_SUCC(ret)) {

View File

@ -330,8 +330,7 @@ int ObExprSubQueryRef::expr_eval(
ObSQLSessionInfo *session = ctx.exec_ctx_.get_my_session();
CK (OB_NOT_NULL(ctx.exec_ctx_.get_sql_ctx()));
CK (OB_NOT_NULL(ctx.exec_ctx_.get_sql_ctx()->schema_guard_));
OZ (pl_type.get_size(pl::ObPLUDTNS(*ctx.exec_ctx_.get_sql_ctx()->schema_guard_),
pl::PL_TYPE_INIT_SIZE, param_size));
OZ (pl_type.get_size(pl::PL_TYPE_INIT_SIZE, param_size));
CK (OB_NOT_NULL(data = static_cast<char *>(
ctx.exec_ctx_.get_allocator().alloc(param_size))));
CK (OB_NOT_NULL(obj = static_cast<ObObj *>(

View File

@ -741,7 +741,8 @@ int ObExprUDF::eval_udf(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &res)
false,
true,
info->loc_,
info->is_called_in_sql_))) {
info->is_called_in_sql_,
info->dblink_id_))) {
LOG_WARN("fail to execute udf", K(ret), K(info), K(package_id), K(tmp_result));
if (info->is_called_in_sql_ && OB_NOT_NULL(ctx.exec_ctx_.get_pl_ctx())) {
ctx.exec_ctx_.get_pl_ctx()->reset_obj_range_to_end(cur_obj_count);
@ -958,6 +959,7 @@ int ObExprUDFInfo::deep_copy(common::ObIAllocator &allocator,
other.loc_ = loc_;
other.is_udt_cons_ = is_udt_cons_;
other.is_called_in_sql_ = is_called_in_sql_;
other.dblink_id_ = dblink_id_;
OZ(other.subprogram_path_.assign(subprogram_path_));
OZ(other.params_type_.assign(params_type_));
OZ(other.params_desc_.assign(params_desc_));
@ -980,6 +982,7 @@ int ObExprUDFInfo::from_raw_expr(RE &raw_expr)
is_udt_udf_ = udf_expr.get_is_udt_udf();
loc_ = udf_expr.get_loc();
is_udt_cons_ = udf_expr.get_is_udt_cons();
dblink_id_ = udf_expr.get_dblink_id();
return ret;
}

View File

@ -39,7 +39,8 @@ struct ObExprUDFInfo : public ObIExprExtraInfo
public:
ObExprUDFInfo(common::ObIAllocator &alloc, ObExprOperatorType type)
: ObIExprExtraInfo(alloc, type),
subprogram_path_(alloc), params_type_(alloc), params_desc_(alloc), nocopy_params_(alloc)
subprogram_path_(alloc), params_type_(alloc), params_desc_(alloc), nocopy_params_(alloc),
dblink_id_(OB_INVALID_ID)
{
}
@ -61,6 +62,7 @@ public:
uint64_t loc_;
bool is_udt_cons_;
bool is_called_in_sql_;
uint64_t dblink_id_;
};
class ObSqlCtx;
class ObUDFParamDesc;

View File

@ -990,6 +990,17 @@ pl::ObPLPackageGuard* ObExecContext::get_package_guard()
return package_guard_;
}
int ObExecContext::get_package_guard(pl::ObPLPackageGuard *&package_guard)
{
int ret = OB_SUCCESS;
package_guard = get_package_guard();
if (OB_ISNULL(package_guard)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("get package guard failed", K(ret));
}
return ret;
}
DEFINE_SERIALIZE(ObExecContext)
{
int ret = OB_SUCCESS;

View File

@ -343,7 +343,9 @@ public:
inline pl::ObPLCtx *get_pl_ctx() { return pl_ctx_; }
inline void set_pl_ctx(pl::ObPLCtx *pl_ctx) { pl_ctx_ = pl_ctx; }
pl::ObPLPackageGuard* get_package_guard();
int get_package_guard(pl::ObPLPackageGuard *&package_guard);
inline pl::ObPLPackageGuard* get_original_package_guard() { return package_guard_; }
inline void set_package_guard(pl::ObPLPackageGuard* v) { package_guard_ = v; }
int init_pl_ctx();
ObPartIdRowMapManager& get_part_row_manager() { return part_row_map_manager_; }

View File

@ -331,6 +331,9 @@ public:
inline bool has_link_table() const { return has_link_table_; }
inline void set_has_link_sfd(bool value) { has_link_sfd_ = value; }
inline bool has_link_sfd() const { return has_link_sfd_; }
inline void set_has_link_udf(bool value) { has_link_udf_ = value; }
inline bool has_link_udf() const { return has_link_udf_; }
void set_batch_size(const int64_t v) { batch_size_ = v; }
int64_t get_batch_size() const { return batch_size_; }
bool is_vectorized() const { return batch_size_ > 0; }
@ -656,6 +659,7 @@ public:
bool use_temp_table_;
bool has_link_table_;
bool has_link_sfd_;
bool has_link_udf_;
bool need_serial_exec_;//mark if need serial execute?
bool temp_sql_can_prepare_;
bool is_need_trans_;