[to #48440485]fix set changed package info when bulk collect into package variable
This commit is contained in:
@ -5410,6 +5410,32 @@ int ObSPIService::inner_fetch_with_retry(ObPLExecCtx *ctx,
|
|||||||
cast_ctx, tmp_result, return_types, return_type_count)); \
|
cast_ctx, tmp_result, return_types, return_type_count)); \
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int ObSPIService::get_package_var_info_by_expr(const ObSqlExpression *expr,
|
||||||
|
uint64_t &package_id,
|
||||||
|
uint64_t &var_idx)
|
||||||
|
{
|
||||||
|
int ret = OB_SUCCESS;
|
||||||
|
// package var need add package change to sync var to remote
|
||||||
|
CK (OB_NOT_NULL(expr));
|
||||||
|
if (OB_FAIL(ret)) {
|
||||||
|
// do nothing
|
||||||
|
} else if (T_OP_GET_PACKAGE_VAR == get_expression_type(*expr)) {
|
||||||
|
OV (5 <= expr->get_expr_items().count(), OB_ERR_UNEXPECTED, expr->get_expr_items().count());
|
||||||
|
CK (T_UINT64 == expr->get_expr_items().at(1).get_item_type());
|
||||||
|
CK (T_INT == expr->get_expr_items().at(2).get_item_type());
|
||||||
|
OX (package_id = expr->get_expr_items().at(1).get_obj().get_uint64());// pkg id
|
||||||
|
OX (var_idx = expr->get_expr_items().at(2).get_obj().get_int());// var idx
|
||||||
|
} else if (is_obj_access_expression(*expr)
|
||||||
|
&& expr->get_expr_items().count() > 1
|
||||||
|
&& T_OP_GET_PACKAGE_VAR == expr->get_expr_items().at(1).get_item_type()) {
|
||||||
|
uint16_t param_pos = expr->get_expr_items().at(1).get_param_idx();
|
||||||
|
OX (package_id = expr->get_expr_items().at(param_pos).get_obj().get_uint64());
|
||||||
|
OX (var_idx = expr->get_expr_items().at(param_pos+1).get_obj().get_int());
|
||||||
|
}
|
||||||
|
LOG_DEBUG("get_package_var_info_by_expr ", K(package_id), K(var_idx));
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
/***************************************************************************************/
|
/***************************************************************************************/
|
||||||
/* 注意:以下是内存排列有关的代码,修改这里一定要十分理解各种数据类型在LLVM端和SQL端的内存排列和生命周期。
|
/* 注意:以下是内存排列有关的代码,修改这里一定要十分理解各种数据类型在LLVM端和SQL端的内存排列和生命周期。
|
||||||
* SQL端的隐式赋值(Into/Bulk Collect Into)相对于显式赋值(Assign)较为简单,因为需要存储的源数据一定是从查询语句获得的
|
* SQL端的隐式赋值(Into/Bulk Collect Into)相对于显式赋值(Assign)较为简单,因为需要存储的源数据一定是从查询语句获得的
|
||||||
@ -5634,11 +5660,14 @@ int ObSPIService::get_result(ObPLExecCtx *ctx,
|
|||||||
ObObjParam result_address;
|
ObObjParam result_address;
|
||||||
ObArray<ObPLCollection*> bulk_tables;
|
ObArray<ObPLCollection*> bulk_tables;
|
||||||
ObArray<ObCastCtx> cast_ctxs;
|
ObArray<ObCastCtx> cast_ctxs;
|
||||||
|
ObArray<std::pair<uint64_t, uint64_t>> package_vars_info;
|
||||||
ObArenaAllocator tmp_allocator;
|
ObArenaAllocator tmp_allocator;
|
||||||
OZ (bulk_tables.reserve(OB_DEFAULT_SE_ARRAY_COUNT));
|
OZ (bulk_tables.reserve(OB_DEFAULT_SE_ARRAY_COUNT));
|
||||||
OZ (cast_ctxs.reserve(OB_DEFAULT_SE_ARRAY_COUNT));
|
OZ (cast_ctxs.reserve(OB_DEFAULT_SE_ARRAY_COUNT));
|
||||||
|
OZ (package_vars_info.reserve(OB_DEFAULT_SE_ARRAY_COUNT));
|
||||||
for (int64_t i = 0; OB_SUCC(ret) && i < into_count; ++i) {
|
for (int64_t i = 0; OB_SUCC(ret) && i < into_count; ++i) {
|
||||||
ObPLCollection *table = NULL;
|
ObPLCollection *table = NULL;
|
||||||
|
std::pair<uint64_t, uint64_t> package_var_info = std::pair<uint64_t, uint64_t>(OB_INVALID_ID, OB_INVALID_ID);
|
||||||
// ObIAllocator *collection_allocator = NULL;
|
// ObIAllocator *collection_allocator = NULL;
|
||||||
CK (OB_NOT_NULL(result_expr = into_exprs[i]));
|
CK (OB_NOT_NULL(result_expr = into_exprs[i]));
|
||||||
CK (is_obj_access_expression(*result_expr));
|
CK (is_obj_access_expression(*result_expr));
|
||||||
@ -5656,6 +5685,10 @@ int ObSPIService::get_result(ObPLExecCtx *ctx,
|
|||||||
OZ (spi_set_collection(ctx->exec_ctx_->get_my_session()->get_effective_tenant_id(),
|
OZ (spi_set_collection(ctx->exec_ctx_->get_my_session()->get_effective_tenant_id(),
|
||||||
ctx, *allocator, *table, 0));
|
ctx, *allocator, *table, 0));
|
||||||
}
|
}
|
||||||
|
OZ (get_package_var_info_by_expr(result_expr, package_var_info.first, package_var_info.second));
|
||||||
|
if (OB_INVALID_ID != package_var_info.first && OB_INVALID_ID != package_var_info.second) {
|
||||||
|
OX (package_vars_info.push_back(package_var_info));
|
||||||
|
}
|
||||||
// collection may modified by sql fetch, which can be reset and allocator will change, such like stmt a:=b in trigger
|
// collection may modified by sql fetch, which can be reset and allocator will change, such like stmt a:=b in trigger
|
||||||
// so allocator of collection can not be used by collect_cells.
|
// so allocator of collection can not be used by collect_cells.
|
||||||
// CK (OB_NOT_NULL(collection_allocator = table->get_allocator()));
|
// CK (OB_NOT_NULL(collection_allocator = table->get_allocator()));
|
||||||
@ -5703,6 +5736,10 @@ int ObSPIService::get_result(ObPLExecCtx *ctx,
|
|||||||
OZ (store_result(bulk_tables, row_count, type_count, tmp_result,
|
OZ (store_result(bulk_tables, row_count, type_count, tmp_result,
|
||||||
NULL == implicit_cursor ? false : implicit_cursor->get_in_forall()));
|
NULL == implicit_cursor ? false : implicit_cursor->get_in_forall()));
|
||||||
}
|
}
|
||||||
|
// update package info
|
||||||
|
for (int64_t i = 0; OB_SUCC(ret) && i < package_vars_info.count(); i++) {
|
||||||
|
OZ (spi_update_package_change_info(ctx, package_vars_info.at(i).first, package_vars_info.at(i).second));
|
||||||
|
}
|
||||||
if (!for_cursor && OB_NOT_NULL(implicit_cursor)) {
|
if (!for_cursor && OB_NOT_NULL(implicit_cursor)) {
|
||||||
OX (implicit_cursor->set_rowcount(row_count)); // 设置隐式游标
|
OX (implicit_cursor->set_rowcount(row_count)); // 设置隐式游标
|
||||||
}
|
}
|
||||||
|
|||||||
@ -867,6 +867,9 @@ private:
|
|||||||
const ObDataType *return_types,
|
const ObDataType *return_types,
|
||||||
int64_t return_type_count);
|
int64_t return_type_count);
|
||||||
|
|
||||||
|
static int get_package_var_info_by_expr(const ObSqlExpression *expr,
|
||||||
|
uint64_t &package_id,
|
||||||
|
uint64_t &var_idx);
|
||||||
static int store_result(ObIArray<pl::ObPLCollection*> &bulk_tables,
|
static int store_result(ObIArray<pl::ObPLCollection*> &bulk_tables,
|
||||||
int64_t row_count,
|
int64_t row_count,
|
||||||
int64_t column_count,
|
int64_t column_count,
|
||||||
|
|||||||
Reference in New Issue
Block a user