diff --git a/src/sql/ob_spi.cpp b/src/sql/ob_spi.cpp index 44b099f8c..d46897359 100644 --- a/src/sql/ob_spi.cpp +++ b/src/sql/ob_spi.cpp @@ -5410,6 +5410,32 @@ int ObSPIService::inner_fetch_with_retry(ObPLExecCtx *ctx, cast_ctx, tmp_result, return_types, return_type_count)); \ } +int ObSPIService::get_package_var_info_by_expr(const ObSqlExpression *expr, + uint64_t &package_id, + uint64_t &var_idx) +{ + int ret = OB_SUCCESS; + // package var need add package change to sync var to remote + CK (OB_NOT_NULL(expr)); + if (OB_FAIL(ret)) { + // do nothing + } else if (T_OP_GET_PACKAGE_VAR == get_expression_type(*expr)) { + OV (5 <= expr->get_expr_items().count(), OB_ERR_UNEXPECTED, expr->get_expr_items().count()); + CK (T_UINT64 == expr->get_expr_items().at(1).get_item_type()); + CK (T_INT == expr->get_expr_items().at(2).get_item_type()); + OX (package_id = expr->get_expr_items().at(1).get_obj().get_uint64());// pkg id + OX (var_idx = expr->get_expr_items().at(2).get_obj().get_int());// var idx + } else if (is_obj_access_expression(*expr) + && expr->get_expr_items().count() > 1 + && T_OP_GET_PACKAGE_VAR == expr->get_expr_items().at(1).get_item_type()) { + uint16_t param_pos = expr->get_expr_items().at(1).get_param_idx(); + OX (package_id = expr->get_expr_items().at(param_pos).get_obj().get_uint64()); + OX (var_idx = expr->get_expr_items().at(param_pos+1).get_obj().get_int()); + } + LOG_DEBUG("get_package_var_info_by_expr ", K(package_id), K(var_idx)); + return ret; +} + /***************************************************************************************/ /* 注意:以下是内存排列有关的代码,修改这里一定要十分理解各种数据类型在LLVM端和SQL端的内存排列和生命周期。 * SQL端的隐式赋值(Into/Bulk Collect Into)相对于显式赋值(Assign)较为简单,因为需要存储的源数据一定是从查询语句获得的 @@ -5634,11 +5660,14 @@ int ObSPIService::get_result(ObPLExecCtx *ctx, ObObjParam result_address; ObArray bulk_tables; ObArray cast_ctxs; + ObArray> package_vars_info; ObArenaAllocator tmp_allocator; OZ (bulk_tables.reserve(OB_DEFAULT_SE_ARRAY_COUNT)); OZ (cast_ctxs.reserve(OB_DEFAULT_SE_ARRAY_COUNT)); + OZ (package_vars_info.reserve(OB_DEFAULT_SE_ARRAY_COUNT)); for (int64_t i = 0; OB_SUCC(ret) && i < into_count; ++i) { ObPLCollection *table = NULL; + std::pair package_var_info = std::pair(OB_INVALID_ID, OB_INVALID_ID); // ObIAllocator *collection_allocator = NULL; CK (OB_NOT_NULL(result_expr = into_exprs[i])); CK (is_obj_access_expression(*result_expr)); @@ -5656,6 +5685,10 @@ int ObSPIService::get_result(ObPLExecCtx *ctx, OZ (spi_set_collection(ctx->exec_ctx_->get_my_session()->get_effective_tenant_id(), ctx, *allocator, *table, 0)); } + OZ (get_package_var_info_by_expr(result_expr, package_var_info.first, package_var_info.second)); + if (OB_INVALID_ID != package_var_info.first && OB_INVALID_ID != package_var_info.second) { + OX (package_vars_info.push_back(package_var_info)); + } // collection may modified by sql fetch, which can be reset and allocator will change, such like stmt a:=b in trigger // so allocator of collection can not be used by collect_cells. // CK (OB_NOT_NULL(collection_allocator = table->get_allocator())); @@ -5703,6 +5736,10 @@ int ObSPIService::get_result(ObPLExecCtx *ctx, OZ (store_result(bulk_tables, row_count, type_count, tmp_result, NULL == implicit_cursor ? false : implicit_cursor->get_in_forall())); } + // update package info + for (int64_t i = 0; OB_SUCC(ret) && i < package_vars_info.count(); i++) { + OZ (spi_update_package_change_info(ctx, package_vars_info.at(i).first, package_vars_info.at(i).second)); + } if (!for_cursor && OB_NOT_NULL(implicit_cursor)) { OX (implicit_cursor->set_rowcount(row_count)); // 设置隐式游标 } diff --git a/src/sql/ob_spi.h b/src/sql/ob_spi.h index b495ae5bd..e7e6513aa 100644 --- a/src/sql/ob_spi.h +++ b/src/sql/ob_spi.h @@ -867,6 +867,9 @@ private: const ObDataType *return_types, int64_t return_type_count); + static int get_package_var_info_by_expr(const ObSqlExpression *expr, + uint64_t &package_id, + uint64_t &var_idx); static int store_result(ObIArray &bulk_tables, int64_t row_count, int64_t column_count,