[CP] covert the result set to return type when fetching cursor specified return type

This commit is contained in:
obdev 2022-12-29 03:08:14 +00:00 committed by ob-robot
parent 9e1c3039b0
commit 50076ceb7e
6 changed files with 333 additions and 142 deletions

View File

@ -1104,6 +1104,8 @@ int ObPLCodeGenerateVisitor::visit(const ObPLCursorForLoopStmt &s)
s.get_cursor()->get_routine_id(),
s.get_index(),
static_cast<int64_t>(INT64_MAX),
s.get_user_type(),
s.get_expand_user_types(),
ret_err));
OZ (generator_.get_helper().create_icmp_eq(ret_err, OB_READ_NOTHING, is_not_found));
OZ (generator_.get_helper().stack_restore(stack));
@ -2687,6 +2689,8 @@ int ObPLCodeGenerateVisitor::visit(const ObPLFetchStmt &s)
s.get_routine_id(),
s.get_index(),
s.get_limit(),
s.get_user_type(),
s.get_expand_user_types(),
ret_err))) {
LOG_WARN("failed to generate fetch", K(ret));
} else if (lib::is_mysql_mode()) { //Mysql模式直接检查抛出异常
@ -3603,6 +3607,8 @@ int ObPLCodeGenerator::init_spi_service()
OZ (arg_types.push_back(int_pointer_type));//pl_integer_ranges
OZ (arg_types.push_back(bool_type));//is_bulk
OZ (arg_types.push_back(int64_type));//limit
OZ (arg_types.push_back(data_type_pointer_type));//return_type
OZ (arg_types.push_back(int64_type));//return_type_count
OZ (ObLLVMFunctionType::get(int32_type, arg_types, ft));
OZ (helper_.create_function(ObString("spi_cursor_fetch"), ft, spi_service_.spi_cursor_fetch_));
}
@ -4548,6 +4554,8 @@ int ObPLCodeGenerator::generate_fetch(const ObPLStmt &s,
const uint64_t &routine_id,
const int64_t &cursor_index,
const int64_t &limit,
const ObUserDefinedType *user_defined_type,
const ObIArray<ObDataType> &user_types,
jit::ObLLVMValue &ret_err)
{
int ret = OB_SUCCESS;
@ -4622,6 +4630,45 @@ int ObPLCodeGenerator::generate_fetch(const ObPLStmt &s,
OZ (args.push_back(limit_value));
}
ObLLVMValue return_type_array_value;
ObLLVMValue return_type_count_value;
ObLLVMValue type_value;
ObLLVMType data_type;
ObLLVMType data_type_pointer;
ObLLVMType array_type;
if (OB_ISNULL(user_defined_type)) {
OZ (adt_service_.get_data_type(data_type));
OZ (data_type.get_pointer_to(data_type_pointer));
OZ (ObLLVMHelper::get_null_const(data_type_pointer, return_type_array_value));
OZ (helper_.get_int64(0, return_type_count_value));
} else {
OZ (adt_service_.get_data_type(data_type));
OZ (data_type.get_pointer_to(data_type_pointer));
OZ (ObLLVMHelper::get_array_type(data_type,
user_types.count(),
array_type));
OZ (helper_.create_alloca(ObString("datatype_array"),
array_type,
return_type_array_value));
for (int64_t i = 0; OB_SUCC(ret) && i < user_types.count(); ++i) {
type_value.reset();
OZ (helper_.create_gep(ObString("extract_datatype"),
return_type_array_value,
i, type_value));
OZ (store_data_type(user_types.at(i), type_value));
}
OZ (helper_.create_bit_cast(ObString("datatype_array_to_pointer"),
return_type_array_value, data_type_pointer, return_type_array_value));
OZ (helper_.get_int64(static_cast<int64_t>(user_types.count()),
return_type_count_value));
}
OZ (args.push_back(return_type_array_value));
OZ (args.push_back(return_type_count_value));
OZ (get_helper().create_call(ObString("spi_cursor_fetch"),
get_spi_service().spi_cursor_fetch_,
args, ret_err));

View File

@ -262,6 +262,8 @@ public:
const uint64_t &routine_id,
const int64_t &cursor_index,
const int64_t &limit,
const ObUserDefinedType *user_defined_type,
const ObIArray<ObDataType> &user_types,
jit::ObLLVMValue &ret_err);
int generate_close(const ObPLStmt &s,
const uint64_t &package_id,

View File

@ -3507,6 +3507,9 @@ int ObPLResolver::resolve_cursor_for_loop(
} else {
stmt->set_user_type(user_type);
stmt->set_index_index(func.get_symbol_table().get_count() - 1);
if (OB_FAIL(body_block->get_namespace().expand_data_type(user_type, stmt->get_expand_user_types()))) {
LOG_WARN("fail to expand data type", K(ret));
}
}
}
// 将index作为fetch into变量
@ -6874,27 +6877,37 @@ int ObPLResolver::resolve_fetch(
} else {
bool is_compatible = true;
const ObRecordType *return_type = static_cast<const ObRecordType*>(cursor_type);
if (return_type->get_record_member_count() != stmt->get_data_type().count()
stmt->set_user_type(cursor_type);
if (OB_FAIL(current_block_->get_namespace().expand_data_type(cursor_type, stmt->get_expand_user_types()))) {
LOG_WARN("fail to expand data type", K(ret));
} else if (return_type->get_record_member_count() != stmt->get_data_type().count()
&& return_type->get_record_member_count() != stmt->get_into().count()) {
ret = OB_ERR_WRONG_FETCH_INTO_NUM;
LOG_WARN("wrong number of values in the INTO list of a FETCH statement", K(ret));
} else {
bool has_record_type = false;
for (int64_t i = 0;
OB_SUCC(ret) && is_compatible && i < return_type->get_record_member_count();
++i) {
const ObPLDataType *left = return_type->get_record_member_type(i);
ObDataType right;
if (return_type->get_record_member_count() == stmt->get_data_type().count()) {
right = stmt->get_data_type(i);
} else {
const ObRawExpr *raw_expr = func.get_expr(stmt->get_into(i));
CK (return_type->get_record_member_count() == stmt->get_into().count());
CK (OB_NOT_NULL(raw_expr));
OV (raw_expr->get_result_type().is_ext(), OB_ERR_UNEXPECTED, KPC(raw_expr));
OX (right.set_meta_type(raw_expr->get_result_type()));
OX (right.set_accuracy(raw_expr->get_result_type().get_accuracy()));
}
CK (OB_NOT_NULL(left));
if (OB_SUCC(ret)) {
if (!left->is_obj_type()) {
has_record_type = true;
}
if (!has_record_type) {
right = stmt->get_data_type(i);
} else {
const ObRawExpr *raw_expr = func.get_expr(stmt->get_into(i));
CK (return_type->get_record_member_count() == stmt->get_into().count());
CK (OB_NOT_NULL(raw_expr));
OV (raw_expr->get_result_type().is_ext(), OB_ERR_UNEXPECTED, KPC(raw_expr));
OX (right.set_meta_type(raw_expr->get_result_type()));
OX (right.set_accuracy(raw_expr->get_result_type().get_accuracy()));
}
}
if (OB_FAIL(ret)) {
} else if (left->is_obj_type() && !right.get_meta_type().is_ext()) {
if (right.get_meta_type().is_null()

View File

@ -2192,7 +2192,8 @@ public:
cursor_idx_(OB_INVALID_INDEX),
params_(allocator),
need_declare_(false),
user_type_(NULL) {}
user_type_(NULL),
expand_user_types_() {}
virtual ~ObPLCursorForLoopStmt() {}
@ -2261,6 +2262,8 @@ public:
user_type_ = user_type;
}
ObIArray<ObDataType> &get_expand_user_types() { return expand_user_types_; }
const ObIArray<ObDataType> &get_expand_user_types() const { return expand_user_types_; }
inline void set_need_declare(bool need) { need_declare_ = need; }
inline bool get_need_declare() const { return need_declare_; }
@ -2274,6 +2277,7 @@ private:
ObPLSEArray<int64_t> params_; //cursor的实参
bool need_declare_; // 是否是SQL语句, 需要在codegen时声明
const ObUserDefinedType *user_type_; // CURSOR返回值类型
ObSEArray<ObDataType, 8> expand_user_types_; //expand return type
};
class ObPLSqlStmt;
@ -2908,7 +2912,9 @@ public:
pkg_id_(OB_INVALID_ID),
routine_id_(OB_INVALID_ID),
idx_(common::OB_INVALID_INDEX),
limit_(INT64_MAX) {}
limit_(INT64_MAX),
user_type_(NULL),
expand_user_types_() {}
virtual ~ObPLFetchStmt() {}
int accept(ObPLStmtVisitor &visitor) const;
@ -2923,6 +2929,16 @@ public:
routine_id_ = routine_id;
idx_ = idx;
}
inline const ObUserDefinedType* get_user_type() const
{
return user_type_;
}
inline void set_user_type(const ObUserDefinedType *user_type)
{
user_type_ = user_type;
}
ObIArray<ObDataType> &get_expand_user_types() { return expand_user_types_; }
const ObIArray<ObDataType> &get_expand_user_types() const { return expand_user_types_; }
inline int64_t get_limit() const { return limit_; }
inline void set_limit(int64_t limit) { limit_ = limit; }
@ -2933,6 +2949,8 @@ private:
uint64_t routine_id_;
int64_t idx_; //symbol表里的下标
int64_t limit_; //INT64_MAX:是bulk fetch但是没有limit子句
const ObUserDefinedType *user_type_; // CURSOR返回值类型
ObSEArray<ObDataType, 8> expand_user_types_; //expand return type
};
class ObPLCloseStmt : public ObPLStmt

View File

@ -1814,44 +1814,55 @@ int ObSPIService::spi_build_record_type_by_result_set(common::ObIAllocator &allo
const sql::ObResultSet &result_set,
int64_t hidden_column_count,
ObRecordType *&record_type,
uint64_t &rowid_table_id)
uint64_t &rowid_table_id,
pl::ObPLBlockNS *secondary_namespace)
{
int ret = OB_SUCCESS;
const common::ColumnsFieldIArray *columns = result_set.get_field_columns();
if (OB_ISNULL(columns) || OB_ISNULL(record_type) || 0 == columns->count()) {
if (OB_ISNULL(columns) || OB_ISNULL(record_type) || 0 == columns->count() || OB_ISNULL(secondary_namespace)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("invalid argument", K(columns), K(record_type), K(ret));
} else {
int dup_idx = 0;
OZ (record_type->record_members_init(&allocator, columns->count() - hidden_column_count));
for (int64_t i = 0; OB_SUCC(ret) && i < columns->count() - hidden_column_count; ++i) {
ObDataType data_type;
ObPLDataType pl_type;
data_type.set_meta_type(columns->at(i).type_.get_meta());
data_type.set_accuracy(columns->at(i).accuracy_);
pl_type.set_data_type(data_type);
char* name_buf = NULL;
if (OB_ISNULL(name_buf = static_cast<char*>(allocator.alloc(columns->at(i).cname_.length() + 10)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc column name buf", K(ret), K(columns->at(i).cname_));
if (columns->at(i).type_.is_ext()) {
int64_t udt_id = columns->at(i).accuracy_.accuracy_;
const ObUserDefinedType *user_type = NULL;
OZ (secondary_namespace->get_pl_data_type_by_id(udt_id, user_type));
CK (OB_NOT_NULL(user_type));
OX (pl_type.set_user_type_id(user_type->get_type(), udt_id));
} else {
bool duplicate = false;
for (int64_t j = 0; OB_SUCC(ret) && j < columns->count() - hidden_column_count; ++j) {
if (i != j && columns->at(j).cname_ == columns->at(i).cname_) {
duplicate = true;
break;
}
}
if (duplicate) {
sprintf(name_buf, "%.*s&%d",
columns->at(i).cname_.length(), columns->at(i).cname_.ptr(), dup_idx);
dup_idx++;
ObDataType data_type;
data_type.set_meta_type(columns->at(i).type_.get_meta());
data_type.set_accuracy(columns->at(i).accuracy_);
pl_type.set_data_type(data_type);
}
if (OB_SUCC(ret)) {
char* name_buf = NULL;
if (OB_ISNULL(name_buf = static_cast<char*>(allocator.alloc(columns->at(i).cname_.length() + 10)))) {
ret = OB_ALLOCATE_MEMORY_FAILED;
LOG_WARN("failed to alloc column name buf", K(ret), K(columns->at(i).cname_));
} else {
sprintf(name_buf, "%.*s", columns->at(i).cname_.length(), columns->at(i).cname_.ptr());
}
ObString deep_copy_name(name_buf);
if (OB_FAIL(record_type->add_record_member(deep_copy_name, pl_type))) {
LOG_WARN("add record member failed", K(ret));
bool duplicate = false;
for (int64_t j = 0; OB_SUCC(ret) && j < columns->count() - hidden_column_count; ++j) {
if (i != j && columns->at(j).cname_ == columns->at(i).cname_) {
duplicate = true;
break;
}
}
if (duplicate) {
sprintf(name_buf, "%.*s&%d",
columns->at(i).cname_.length(), columns->at(i).cname_.ptr(), dup_idx);
dup_idx++;
} else {
sprintf(name_buf, "%.*s", columns->at(i).cname_.length(), columns->at(i).cname_.ptr());
}
ObString deep_copy_name(name_buf);
if (OB_FAIL(record_type->add_record_member(deep_copy_name, pl_type))) {
LOG_WARN("add record member failed", K(ret));
}
}
}
}
@ -1928,7 +1939,8 @@ int ObSPIService::spi_resolve_prepare(common::ObIAllocator &allocator,
inner_result->result_set(),
prepare_result.has_hidden_rowid_ ? 1 : 0,
prepare_result.record_type_,
prepare_result.rowid_table_id_));
prepare_result.rowid_table_id_,
secondary_namespace));
} else {
HEAP_VAR(ObMySQLProxy::MySQLResult, proxy_result) {
ObInnerSQLConnection *spi_conn = NULL;
@ -1956,7 +1968,8 @@ int ObSPIService::spi_resolve_prepare(common::ObIAllocator &allocator,
result->result_set(),
prepare_result.has_hidden_rowid_ ? 1 : 0,
prepare_result.record_type_,
prepare_result.rowid_table_id_));
prepare_result.rowid_table_id_,
secondary_namespace));
}
}
}
@ -3482,7 +3495,9 @@ int ObSPIService::do_cursor_fetch(ObPLExecCtx *ctx,
const bool *exprs_not_null_flag,
const int64_t *pl_integer_ranges,
bool is_bulk,
int64_t limit)
int64_t limit,
const ObDataType *return_types,
int64_t return_type_count)
{
int ret = OB_SUCCESS;
ObSPIResultSet *spi_result = NULL;
@ -3593,7 +3608,10 @@ int ObSPIService::do_cursor_fetch(ObPLExecCtx *ctx,
is_bulk, \
false/*is_dynamic_sql*/, \
true/*for_cursor*/, \
limit); \
false, \
limit, \
return_types, \
return_type_count); \
} else { \
ret = inner_fetch_with_retry(ctx, \
cursor->get_cursor_handler()->get_mysql_result().get_result(), \
@ -3609,7 +3627,9 @@ int ObSPIService::do_cursor_fetch(ObPLExecCtx *ctx,
is_bulk, \
true, \
limit, \
cursor->get_last_execute_time()); \
cursor->get_last_execute_time(), \
return_types, \
return_type_count); \
} \
} while(0)
@ -3706,7 +3726,9 @@ int ObSPIService::spi_cursor_fetch(ObPLExecCtx *ctx,
const bool *exprs_not_null_flag,
const int64_t *pl_integer_ranges,
bool is_bulk,
int64_t limit)
int64_t limit,
const ObDataType *return_types,
int64_t return_type_count)
{
int ret = OB_SUCCESS;
ObPLCursorInfo *cursor = NULL;
@ -3728,7 +3750,9 @@ int ObSPIService::spi_cursor_fetch(ObPLExecCtx *ctx,
exprs_not_null_flag,
pl_integer_ranges,
is_bulk,
limit));
limit,
return_types,
return_type_count));
if (OB_FAIL(ret) && lib::is_mysql_mode()) {
ctx->exec_ctx_->get_my_session()->set_show_warnings_buf(ret);
@ -4967,7 +4991,9 @@ int ObSPIService::inner_fetch(ObPLExecCtx *ctx,
ObNewRow *current_row,
bool has_hidden_rowid,
bool for_cursor,
int64_t limit)
int64_t limit,
const ObDataType *return_types,
int64_t return_type_count)
{
int ret = OB_SUCCESS;
CK (OB_NOT_NULL(ctx));
@ -4996,7 +5022,9 @@ int ObSPIService::inner_fetch(ObPLExecCtx *ctx,
is_dynamic_sql,
for_cursor,
is_forall,
limit)))) {
limit,
return_types,
return_type_count)))) {
if (can_retry) {
int cli_ret = OB_SUCCESS;
retry_ctrl.test_and_save_retry_state(
@ -5030,7 +5058,9 @@ int ObSPIService::inner_fetch_with_retry(ObPLExecCtx *ctx,
bool is_bulk,
bool for_cursor,
int64_t limit,
int64_t last_exec_time)
int64_t last_exec_time,
const ObDataType *return_types,
int64_t return_type_count)
{
int ret = OB_SUCCESS;
ObQueryRetryCtrl retry_ctrl;
@ -5097,7 +5127,9 @@ int ObSPIService::inner_fetch_with_retry(ObPLExecCtx *ctx,
&current_row,
has_hidden_rowid,
for_cursor,
limit);
limit,
return_types,
return_type_count);
}
// NOTE: cursor fetch failed can not retry, we only use this to refresh location cache.
} while (RETRY_TYPE_NONE != retry_ctrl.get_retry_type() && !for_cursor);
@ -5112,6 +5144,7 @@ int ObSPIService::inner_fetch_with_retry(ObPLExecCtx *ctx,
#define STORE_INTO_RESULT(singal_value) \
ObCastCtx cast_ctx(allocator, &dtc_params, cast_mode, cast_coll_type); \
if (into_count > 1) { /* 多个variables, 每个都是基础变量 */ \
CK(return_types != nullptr ? return_type_count == into_count : true); \
ObSEArray<ObObj, 1> tmp_result; \
ObSEArray<ObDataType, 1> tmp_desc; \
for (int64_t i = 0; OB_SUCC(ret) && i < actual_column_count; ++i) { \
@ -5122,7 +5155,9 @@ int ObSPIService::inner_fetch_with_retry(ObPLExecCtx *ctx,
OZ (store_result(ctx, into_exprs[i], &column_types[i], \
1, exprs_not_null + i, pl_integer_ranges + i, \
tmp_desc, is_strict, \
cast_ctx, tmp_result)); \
cast_ctx, tmp_result, \
return_types != nullptr ? &return_types[i] : nullptr, \
return_types != nullptr ? 1 : 0)); \
} \
} else { /* 单个record或单个variable */ \
ObSEArray<ObObj, OB_DEFAULT_SE_ARRAY_COUNT> tmp_result; \
@ -5152,7 +5187,7 @@ int ObSPIService::inner_fetch_with_retry(ObPLExecCtx *ctx,
OZ (store_result(ctx, into_exprs[0], column_types, \
type_count, exprs_not_null, pl_integer_ranges, \
row_desc, is_strict, \
cast_ctx, tmp_result)); \
cast_ctx, tmp_result, return_types, return_type_count)); \
}
/***************************************************************************************/
@ -5179,7 +5214,9 @@ int ObSPIService::get_result(ObPLExecCtx *ctx,
bool is_dynamic_sql,
bool for_cursor, //是否检查单行和notfound
bool is_forall,
int64_t limit) //INT64_MAX:无limit
int64_t limit, //INT64_MAX:无limit
const ObDataType *return_types,
int64_t return_type_count)
{
int ret = OB_SUCCESS;
ObIAllocator *allocator = ctx->allocator_;
@ -5626,6 +5663,109 @@ int ObSPIService::check_package_dest_and_deep_copy(
return ret;
}
int ObSPIService::convert_obj(ObPLExecCtx *ctx,
ObCastCtx &cast_ctx,
bool is_strict,
const ObSqlExpression *result_expr,
const ObIArray<ObDataType> &current_type,
ObIArray<ObObj> &obj_array,
const ObDataType *result_types,
int64_t type_count,
ObIArray<ObObj> &calc_array)
{
int ret = OB_SUCCESS;
CK(OB_NOT_NULL(ctx));
CK(OB_NOT_NULL(ctx->func_));
CK(OB_NOT_NULL(ctx->params_));
CK(OB_NOT_NULL(ctx->exec_ctx_));
CK(OB_NOT_NULL(ctx->exec_ctx_->get_my_session()));
CK(OB_NOT_NULL(result_types));
CK(OB_NOT_NULL(result_expr));
CK(current_type.count() == type_count);
CK(obj_array.count() == type_count);
ObObj tmp_obj;
for (int i = 0; OB_SUCC(ret) && i < obj_array.count(); ++i) {
ObObj &obj = obj_array.at(i);
tmp_obj.reset();
obj.set_collation_level(result_types[i].get_collation_level());
LOG_DEBUG("column convert", K(obj.get_meta()), K(result_types[i].get_meta_type()),
K(current_type.at(i)), K(result_types[i].get_accuracy()));
if (obj.get_meta() == result_types[i].get_meta_type()
&& (current_type.at(i).get_accuracy() == result_types[i].get_accuracy()
|| (result_types[i].get_meta_type().is_number() // NUMBER目标类型精度未知直接做赋值
&& PRECISION_UNKNOWN_YET == result_types[i].get_accuracy().get_precision()
&& ORA_NUMBER_SCALE_UNKNOWN_YET == result_types[i].get_accuracy().get_scale())
|| (result_types[i].get_meta_type().is_character_type() // CHAR/VARCHAR长度未知,直接赋值
&& -1 == result_types[i].get_accuracy().get_length()))) {
ObObj tmp_obj;
if (obj.is_pl_extend()) {
OZ (ObUserDefinedType::deep_copy_obj(*cast_ctx.allocator_v2_, obj, tmp_obj, true));
if (OB_SUCC(ret) && obj.get_meta().get_extend_type() != PL_CURSOR_TYPE) {
OZ (ObUserDefinedType::destruct_obj(obj, ctx->exec_ctx_->get_my_session()));
}
} else {
OZ (deep_copy_obj(*cast_ctx.allocator_v2_, obj, tmp_obj));
}
if (OB_SUCC(ret) && tmp_obj.get_meta().is_bit()) {
tmp_obj.set_scale(result_types[i].get_meta_type().get_scale());
}
OZ (calc_array.push_back(tmp_obj));
if (OB_SUCC(ret)) {
LOG_DEBUG("same type directyly copy", K(obj), K(tmp_obj), K(result_types[i]), K(i));
}
} else {
LOG_DEBUG("column convert", K(i), K(obj.get_meta()), K(result_types[i].get_meta_type()),
K(current_type.at(i)), K(result_types[i].get_accuracy()));
const ObIArray<ObString> *type_info = NULL;
if (ob_is_enum_or_set_type(result_types[i].get_obj_type())) {
if (!is_question_mark_expression(*result_expr)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("only can store to local enum set variables", K(ret));
} else {
int64_t param_idx = get_const_value(*result_expr).get_unknown();
if (param_idx >= ctx->func_->get_variables().count() || param_idx < 0) {
ret = OB_ARRAY_OUT_OF_RANGE;
LOG_WARN("param idx out of range", K(ret), K(param_idx));
} else {
type_info = &(ctx->func_->get_variables().at(param_idx).get_type_info());
}
}
}
ObExprResType result_type;
OX (result_type.set_meta(result_types[i].get_meta_type()));
OX (result_type.set_accuracy(result_types[i].get_accuracy()));
if (OB_SUCC(ret) && (result_type.is_blob() || result_type.is_blob_locator())
&& lib::is_oracle_mode()) {
cast_ctx.cast_mode_ |= CM_ENABLE_BLOB_CAST;
}
if (OB_FAIL(ret)) {
} else if (result_type.is_null() || result_type.is_unknown()) {
tmp_obj = obj;
} else {
OZ (ObExprColumnConv::convert_with_null_check(tmp_obj, obj, result_type, is_strict, cast_ctx, type_info));
if (OB_ERR_DATA_TOO_LONG == ret && lib::is_oracle_mode()) {
LOG_WARN("change error code to value error", K(ret));
ret = OB_ERR_NUMERIC_OR_VALUE_ERROR;
}
}
if (OB_SUCC(ret) && tmp_obj.need_deep_copy() && obj.get_string_ptr() == tmp_obj.get_string_ptr()) {
// obj may not deep copied in ObExprColumnConv::convert(), do deep copy it if needed.
ObObj tmp_obj2 = tmp_obj;
CK (OB_NOT_NULL(cast_ctx.allocator_v2_));
OZ (deep_copy_obj(*cast_ctx.allocator_v2_, tmp_obj2, tmp_obj));
}
OZ (calc_array.push_back(tmp_obj));
if (OB_SUCC(ret) && tmp_obj.get_meta().is_bit()) {
calc_array.at(i).set_scale(result_types[i].get_meta_type().get_scale());
}
}
}
return ret;
}
int ObSPIService::store_result(ObPLExecCtx *ctx,
const ObSqlExpression *result_expr,
const ObDataType *result_types,
@ -5635,11 +5775,14 @@ int ObSPIService::store_result(ObPLExecCtx *ctx,
const ObIArray<ObDataType> &row_desc,
bool is_strict,
ObCastCtx &cast_ctx,
ObIArray<ObObj> &obj_array)
ObIArray<ObObj> &obj_array,
const ObDataType *return_types,
int64_t return_type_count)
{
int ret = OB_SUCCESS;
bool need_convert_type = true;
ObSEArray<ObObj, OB_DEFAULT_SE_ARRAY_COUNT> tmp_obj_array;
ObSEArray<ObObj, OB_DEFAULT_SE_ARRAY_COUNT> tmp_obj_array1;
ObIArray<ObObj> *calc_array = &obj_array;
CK(OB_NOT_NULL(ctx));
CK(OB_NOT_NULL(ctx->func_));
@ -5673,90 +5816,35 @@ int ObSPIService::store_result(ObPLExecCtx *ctx,
K(ret));
}
}
// 做类型转换
if (OB_SUCC(ret) && need_convert_type) {
ObObj tmp_obj;
calc_array = &tmp_obj_array;
for (int i = 0; OB_SUCC(ret) && i < obj_array.count(); ++i) {
ObObj &obj = obj_array.at(i);
tmp_obj.reset();
obj.set_collation_level(result_types[i].get_collation_level());
LOG_DEBUG("column convert", K(obj.get_meta()), K(result_types[i].get_meta_type()),
K(row_desc.at(i)), K(result_types[i].get_accuracy()));
if (obj.get_meta() == result_types[i].get_meta_type()
&& (row_desc.at(i).get_accuracy() == result_types[i].get_accuracy()
|| (result_types[i].get_meta_type().is_number() // NUMBER目标类型精度未知直接做赋值
&& PRECISION_UNKNOWN_YET == result_types[i].get_accuracy().get_precision()
&& ORA_NUMBER_SCALE_UNKNOWN_YET == result_types[i].get_accuracy().get_scale())
|| (result_types[i].get_meta_type().is_character_type() // CHAR/VARCHAR长度未知,直接赋值
&& -1 == result_types[i].get_accuracy().get_length()))) {
ObObj tmp_obj;
if (obj.is_pl_extend()) {
OZ (ObUserDefinedType::deep_copy_obj(*cast_ctx.allocator_v2_, obj, tmp_obj, true));
/*
* Must deconstruct composite in ObObj
* */
if (OB_SUCC(ret) && obj.get_meta().get_extend_type() != PL_CURSOR_TYPE) {
OZ (ObUserDefinedType::destruct_obj(obj, ctx->exec_ctx_->get_my_session()));
}
} else {
OZ (deep_copy_obj(*cast_ctx.allocator_v2_, obj, tmp_obj));
}
if (OB_SUCC(ret) && tmp_obj.get_meta().is_bit()) {
tmp_obj.set_scale(result_types[i].get_meta_type().get_scale());
}
OZ (tmp_obj_array.push_back(tmp_obj));
if (OB_SUCC(ret)) {
LOG_DEBUG("same type directyly copy", K(obj), K(tmp_obj), K(result_types[i]), K(i));
}
} else {
LOG_DEBUG("column convert", K(i), K(obj.get_meta()), K(result_types[i].get_meta_type()),
K(row_desc.at(i)), K(result_types[i].get_accuracy()));
const ObIArray<ObString> *type_info = NULL;
if (ob_is_enum_or_set_type(result_types[i].get_obj_type())) {
if (!is_question_mark_expression(*result_expr)) {
ret = OB_NOT_SUPPORTED;
LOG_WARN("only can store to local enum set variables", K(ret));
LOG_USER_ERROR(OB_NOT_SUPPORTED, "store to not local enum set variables");
} else {
int64_t param_idx = get_const_value(*result_expr).get_unknown();
if (param_idx >= ctx->func_->get_variables().count() || param_idx < 0) {
ret = OB_ARRAY_OUT_OF_RANGE;
LOG_WARN("param idx out of range", K(ret), K(param_idx));
} else {
type_info = &(ctx->func_->get_variables().at(param_idx).get_type_info());
}
}
}
ObExprResType result_type;
OX (result_type.set_meta(result_types[i].get_meta_type()));
OX (result_type.set_accuracy(result_types[i].get_accuracy()));
if (OB_SUCC(ret) && (result_type.is_blob() || result_type.is_blob_locator())
&& lib::is_oracle_mode()) {
cast_ctx.cast_mode_ |= CM_ENABLE_BLOB_CAST;
}
if (OB_FAIL(ret)) {
} else if (result_type.is_null() || result_type.is_unknown()) {
tmp_obj = obj;
} else {
OZ (ObExprColumnConv::convert_with_null_check(tmp_obj, obj, result_type, is_strict, cast_ctx, type_info));
if (OB_ERR_DATA_TOO_LONG == ret && lib::is_oracle_mode()) {
LOG_WARN("change error code to value error", K(ret));
ret = OB_ERR_NUMERIC_OR_VALUE_ERROR;
}
}
if (OB_SUCC(ret) && tmp_obj.need_deep_copy() && obj.get_string_ptr() == tmp_obj.get_string_ptr()) {
// obj may not deep copied in ObExprColumnConv::convert(), do deep copy it if needed.
ObObj tmp_obj2 = tmp_obj;
CK (OB_NOT_NULL(cast_ctx.allocator_v2_));
OZ (deep_copy_obj(*cast_ctx.allocator_v2_, tmp_obj2, tmp_obj));
}
OZ (tmp_obj_array.push_back(tmp_obj));
if (OB_SUCC(ret) && tmp_obj.get_meta().is_bit()) {
tmp_obj_array.at(i).set_scale(result_types[i].get_meta_type().get_scale());
}
if (OB_FAIL(ret)) {
} else if (return_types != nullptr && row_desc.count() == obj_array.count()) {
OZ(convert_obj(ctx, cast_ctx, is_strict, result_expr, row_desc,
obj_array, return_types, return_type_count, tmp_obj_array));
OX(calc_array = &tmp_obj_array);
bool is_same = true;
for (int64_t i = 0; OB_SUCC(ret) && is_same && i < type_count; ++i) {
if (!(return_types[i] == result_types[i])) {
is_same = false;
}
}
if (OB_SUCC(ret) && !is_same && need_convert_type) {
ObSEArray<ObDataType, OB_DEFAULT_SE_ARRAY_COUNT> current_type;
for (int i = 0; OB_SUCC(ret) && i < return_type_count; ++i) {
OZ(current_type.push_back(return_types[i]));
}
OZ(convert_obj(ctx, cast_ctx, is_strict, result_expr, current_type,
tmp_obj_array, result_types, type_count, tmp_obj_array1));
OX(calc_array = &tmp_obj_array1);
}
} else if (need_convert_type) {
OZ(convert_obj(ctx, cast_ctx, is_strict, result_expr, row_desc,
obj_array, result_types, type_count, tmp_obj_array));
OX(calc_array = &tmp_obj_array);
}
// check range
for (int64_t i = 0; OB_SUCC(ret) && i < calc_array->count(); ++i) {

View File

@ -351,7 +351,9 @@ public:
const bool *exprs_not_null_flag,
const int64_t *pl_integer_rangs,
bool is_bulk,
int64_t limit);
int64_t limit,
const ObDataType *return_types,
int64_t return_type_count);
static int spi_cursor_close(pl::ObPLExecCtx *ctx,
uint64_t package_id,
uint64_t routine_id,
@ -431,7 +433,8 @@ public:
const sql::ObResultSet &result_set,
int64_t hidden_column_count,
pl::ObRecordType *&record_type,
uint64_t &rowid_table_id);
uint64_t &rowid_table_id,
pl::ObPLBlockNS *secondary_namespace);
static int spi_construct_collection(
pl::ObPLExecCtx *ctx, uint64_t package_id, ObObjParam *result);
@ -661,7 +664,9 @@ private:
ObNewRow *current_row = NULL,
bool has_hidden_rowid = false,
bool for_cursor = false,
int64_t limit = INT64_MAX);
int64_t limit = INT64_MAX,
const ObDataType *return_types = nullptr,
int64_t return_type_count = 0);
static int inner_fetch_with_retry(
pl::ObPLExecCtx *ctx,
sqlclient::ObMySQLResult *result_set,
@ -677,7 +682,19 @@ private:
bool is_bulk,
bool for_cursor,
int64_t limit,
int64_t last_exec_time);
int64_t last_exec_time,
const ObDataType *return_types = nullptr,
int64_t return_type_count = 0);
static int convert_obj(pl::ObPLExecCtx *ctx,
ObCastCtx &cast_ctx,
bool is_strict,
const ObSqlExpression *result_expr,
const ObIArray<ObDataType> &current_type,
ObIArray<ObObj> &obj_array,
const ObDataType *trans_type,
int64_t trans_type_count,
ObIArray<ObObj> &calc_array);
static int get_result(pl::ObPLExecCtx *ctx,
void *result_set,
@ -697,7 +714,9 @@ private:
bool is_dynamic_sql = false,
bool for_cursor = false,
bool is_forall = false,
int64_t limit = INT64_MAX);
int64_t limit = INT64_MAX,
const ObDataType *return_types = nullptr,
int64_t return_type_count = 0);
static int fetch_row(void *result_set,
bool is_streaming,
@ -721,7 +740,9 @@ private:
const ObIArray<ObDataType> &row_desc,
bool is_strict,
ObCastCtx &cast_ctx,
ObIArray<ObObj> &obj_array);
ObIArray<ObObj> &obj_array,
const ObDataType *return_types,
int64_t return_type_count);
static int store_result(ObIArray<pl::ObPLCollection*> &bulk_tables,
int64_t row_count,
@ -797,7 +818,9 @@ private:
const bool *exprs_not_null_flag,
const int64_t *pl_integer_ranges,
bool is_bulk,
int64_t limit);
int64_t limit,
const ObDataType *return_types = nullptr,
int64_t return_type_count = 0);
static int check_package_dest_and_deep_copy(pl::ObPLExecCtx &ctx,
const ObSqlExpression &expr,