[to #50702943]fixed some define_array/execute_and_fetch bug
This commit is contained in:
@ -18,6 +18,7 @@
|
||||
#include "sql/resolver/ob_resolver_utils.h"
|
||||
#include "sql/parser/ob_parser.h"
|
||||
#include "sql/ob_spi.h"
|
||||
#include "sql/engine/expr/ob_expr_pl_associative_index.h"
|
||||
|
||||
namespace oceanbase
|
||||
{
|
||||
@ -445,62 +446,25 @@ int ObDbmsInfo::column_value(sql::ObSQLSessionInfo *session,
|
||||
ObObjParam obj;
|
||||
ObObj key(ObInt32Type);
|
||||
ObExprResType element_type;
|
||||
// using reset mode instead of append mode after re-DEFINE_ARRAY
|
||||
bool reset_mode = desc->lower_bnd_ > 1 ? desc->cur_idx_ == desc->lower_bnd_ - 1 ? true : false
|
||||
: 0 == desc->cur_idx_ ? true : false;;
|
||||
// always extend fetch_rows_.count() in append mode, and only extend the gap in reset mode
|
||||
/* spi_extend_assoc_array 里面的逻辑是这样的
|
||||
* if table->get_key != NULL, 在 原来table 基础上扩展 extend_size 的 key.
|
||||
* key的内存是从新分配的, 长度是 table->get_count + extend_size
|
||||
* else , 直接分配 extend_size 的 key
|
||||
*/
|
||||
int64_t extend_size = reset_mode
|
||||
? NULL != table->get_key() && NULL != table->get_sort()
|
||||
? fetch_rows_.count() + desc->cur_idx_ - table->get_count()
|
||||
: fetch_rows_.count() + desc->cur_idx_
|
||||
: fetch_rows_.count();
|
||||
if (extend_size > 0) {
|
||||
OZ (ObSPIService::spi_extend_assoc_array(session->get_effective_tenant_id(),
|
||||
NULL,
|
||||
*allocator,
|
||||
*table,
|
||||
extend_size));
|
||||
}
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < extend_size - fetch_rows_.count(); ++i) {
|
||||
OX (new (reinterpret_cast<ObObj*>(table->get_data()) + i) ObObj(ObMaxType));
|
||||
}
|
||||
ObExprPLAssocIndex::Info info;
|
||||
OX (info.for_write_ = true);
|
||||
OX (element_type.set_meta(desc->type_.get_meta_type()));
|
||||
OX (element_type.set_accuracy(desc->type_.get_accuracy()));
|
||||
for (int64_t i = 0; OB_SUCC(ret) && i < fetch_rows_.count(); ++i) {
|
||||
// check `table->get_count() > desc->cur_idx_`
|
||||
// prevent the `table->get_data()[desc->cur_idx_]` is invalid
|
||||
|
||||
/* when lower_bnd_ > 1
|
||||
* cur_idx_ will begin from lower_bnd_ - 1, the key is desc->cur_idx_ + 1
|
||||
* else
|
||||
* cur_id_ will begin from 0, he key is desc->lower_bnd_ + desc->cur_idx_
|
||||
*/
|
||||
if (table->get_count() > desc->cur_idx_) {
|
||||
int64_t index = OB_INVALID_INDEX;
|
||||
ObNewRow &row = fetch_rows_.at(i);
|
||||
ObObjParam src = row.get_cell(col_idx);
|
||||
OX (key.set_int32(desc->lower_bnd_ + desc->cur_idx_));
|
||||
OZ (ObExprPLAssocIndex::do_eval_assoc_index(index, session, info, *table, key, *allocator));
|
||||
CK(table->get_count() >= index);
|
||||
OZ (ObSPIService::spi_convert(session, table->get_allocator(), src, element_type, obj));
|
||||
OZ (deep_copy_obj(*table->get_allocator(),
|
||||
obj,
|
||||
reinterpret_cast<ObObj*>(table->get_data())[desc->cur_idx_]));
|
||||
OX (key.set_int32(desc->lower_bnd_ > 1 ? desc->cur_idx_ + 1 : desc->lower_bnd_ + desc->cur_idx_));
|
||||
CK (OB_NOT_NULL(table->get_key()));
|
||||
CK (OB_NOT_NULL(table->get_sort()));
|
||||
OX (table->set_key(desc->cur_idx_, key));
|
||||
OX (table->set_sort(desc->cur_idx_, desc->cur_idx_ + 1));
|
||||
LOG_DEBUG("column add key ", K(key.get_int32()), K(desc->cur_idx_), K(table->get_key(desc->cur_idx_)->get_int32()), K(table->get_sort(desc->cur_idx_)));
|
||||
reinterpret_cast<ObObj*>(table->get_data())[index-1]));
|
||||
LOG_DEBUG("column add key ", K(col_idx), K(index), K(desc->cur_idx_), K(desc->lower_bnd_), K(key.get_int32()), K(table->get_key(index-1)->get_int32()));
|
||||
OX (++desc->cur_idx_);
|
||||
}
|
||||
}
|
||||
|
||||
OX (table->set_first(reset_mode ? desc->lower_bnd_ > 1 ? desc->lower_bnd_ : 1
|
||||
: table->get_first()));
|
||||
OX (table->set_last(table->get_first() - 1 + table->get_actual_count()));
|
||||
LOG_DEBUG("column value set last and first", K(reset_mode), K(desc->lower_bnd_), K(table->get_first()), K(table->get_actual_count()), K(table->get_last()));
|
||||
LOG_DEBUG("column value set last and first", K(desc->lower_bnd_), K(table->get_first()), K(table->get_actual_count()), K(table->get_last()));
|
||||
} else {
|
||||
ret = OB_ERR_INVALID_TYPE_FOR_ARGUMENT;
|
||||
LOG_WARN("type of out argument must match type of column or bind variable",
|
||||
@ -1141,7 +1105,7 @@ int ObPLDbmsSql::execute(ObExecContext &exec_ctx, ParamStore ¶ms, ObObj &res
|
||||
OB_SUCC(ret) && iter != cursor->get_define_arrays().end();
|
||||
++iter) {
|
||||
ObDbmsCursorInfo::ArrayDesc &array_info = iter->second;
|
||||
array_info.cur_idx_ = array_info.lower_bnd_ > 1 ? array_info.lower_bnd_ - 1 : 0;
|
||||
array_info.cur_idx_ = 0;
|
||||
}
|
||||
} else {
|
||||
number::ObNumber num;
|
||||
@ -1569,10 +1533,12 @@ int ObPLDbmsSql::execute_and_fetch(ObExecContext &exec_ctx, ParamStore ¶ms,
|
||||
OB_SUCC(ret) && iter != cursor->get_define_arrays().end();
|
||||
++iter) {
|
||||
ObDbmsCursorInfo::ArrayDesc &array_info = iter->second;
|
||||
array_info.cur_idx_ = array_info.lower_bnd_ > 1 ? array_info.lower_bnd_ - 1 : 0;
|
||||
array_info.cur_idx_ = 0;
|
||||
}
|
||||
}
|
||||
if (OB_SUCC(ret) && !cursor->isopen()
|
||||
// todo : when dbms_cursor support stream cursor need change here
|
||||
if (OB_SUCC(ret) && (!cursor->isopen()
|
||||
|| (cursor->isopen() && cursor->get_rowcount() == cursor->get_spi_cursor()->cur_))
|
||||
&& !check_stmt_need_to_be_executed_when_parsing(*cursor)) {
|
||||
// do execute.
|
||||
has_open = false;
|
||||
|
||||
@ -78,6 +78,21 @@ int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx,
|
||||
const Info &info,
|
||||
pl::ObPLAssocArray &assoc_array_ref,
|
||||
const common::ObObj &key)
|
||||
{
|
||||
return do_eval_assoc_index(assoc_idx,
|
||||
exec_ctx.get_my_session(),
|
||||
info,
|
||||
assoc_array_ref,
|
||||
key,
|
||||
exec_ctx.get_allocator());
|
||||
}
|
||||
|
||||
int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx,
|
||||
ObSQLSessionInfo *session,
|
||||
const Info &info,
|
||||
pl::ObPLAssocArray &assoc_array_ref,
|
||||
const common::ObObj &key,
|
||||
ObIAllocator &allocator)
|
||||
{
|
||||
int ret = OB_SUCCESS;
|
||||
pl::ObPLAssocArray *assoc_array = &assoc_array_ref;
|
||||
@ -110,9 +125,9 @@ int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx,
|
||||
if (OB_SUCC(ret)) {
|
||||
if (info.for_write_) {
|
||||
if (OB_INVALID_INDEX == index) {
|
||||
pl::ObPLExecCtx *pl_exec_ctx = exec_ctx.get_my_session()->get_pl_context()->get_current_ctx();
|
||||
if (OB_FAIL(ObSPIService::spi_extend_assoc_array(exec_ctx.get_my_session()->get_effective_tenant_id(),
|
||||
pl_exec_ctx, exec_ctx.get_allocator(), *assoc_array, 1))) {
|
||||
pl::ObPLExecCtx *pl_exec_ctx = session->get_pl_context()->get_current_ctx();
|
||||
if (OB_FAIL(ObSPIService::spi_extend_assoc_array(session->get_effective_tenant_id(),
|
||||
pl_exec_ctx, allocator, *assoc_array, 1))) {
|
||||
LOG_WARN("failed to spi_set_collection_data", K(*assoc_array), K(ret));
|
||||
}
|
||||
}
|
||||
|
||||
@ -52,7 +52,7 @@ public:
|
||||
virtual int cg_expr(ObExprCGCtx &op_cg_ctx,
|
||||
const ObRawExpr &raw_expr, ObExpr &rt_expr) const override;
|
||||
static int eval_assoc_idx(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &expr_datum);
|
||||
private:
|
||||
|
||||
struct Info {
|
||||
Info()
|
||||
: for_write_(false),
|
||||
@ -83,11 +83,17 @@ private:
|
||||
const Info &info,
|
||||
pl::ObPLAssocArray &assoc_array,
|
||||
const common::ObObj &key);
|
||||
static int do_eval_assoc_index(int64_t &assoc_idx,
|
||||
ObSQLSessionInfo *session,
|
||||
const Info &info,
|
||||
pl::ObPLAssocArray &assoc_array_ref,
|
||||
const common::ObObj &key,
|
||||
ObIAllocator &allocator);
|
||||
|
||||
static int reserve_assoc_key(pl::ObPLAssocArray &assoc_array);
|
||||
#endif
|
||||
DISALLOW_COPY_AND_ASSIGN(ObExprPLAssocIndex);
|
||||
private:
|
||||
|
||||
Info info_;
|
||||
};
|
||||
|
||||
|
||||
Reference in New Issue
Block a user