diff --git a/src/pl/sys_package/ob_dbms_sql.cpp b/src/pl/sys_package/ob_dbms_sql.cpp index d43821a0ac..bc974d6aff 100644 --- a/src/pl/sys_package/ob_dbms_sql.cpp +++ b/src/pl/sys_package/ob_dbms_sql.cpp @@ -18,6 +18,7 @@ #include "sql/resolver/ob_resolver_utils.h" #include "sql/parser/ob_parser.h" #include "sql/ob_spi.h" +#include "sql/engine/expr/ob_expr_pl_associative_index.h" namespace oceanbase { @@ -445,62 +446,25 @@ int ObDbmsInfo::column_value(sql::ObSQLSessionInfo *session, ObObjParam obj; ObObj key(ObInt32Type); ObExprResType element_type; - // using reset mode instead of append mode after re-DEFINE_ARRAY - bool reset_mode = desc->lower_bnd_ > 1 ? desc->cur_idx_ == desc->lower_bnd_ - 1 ? true : false - : 0 == desc->cur_idx_ ? true : false;; - // always extend fetch_rows_.count() in append mode, and only extend the gap in reset mode - /* spi_extend_assoc_array 里面的逻辑是这样的 - * if table->get_key != NULL, 在 原来table 基础上扩展 extend_size 的 key. - * key的内存是从新分配的, 长度是 table->get_count + extend_size - * else , 直接分配 extend_size 的 key - */ - int64_t extend_size = reset_mode - ? NULL != table->get_key() && NULL != table->get_sort() - ? fetch_rows_.count() + desc->cur_idx_ - table->get_count() - : fetch_rows_.count() + desc->cur_idx_ - : fetch_rows_.count(); - if (extend_size > 0) { - OZ (ObSPIService::spi_extend_assoc_array(session->get_effective_tenant_id(), - NULL, - *allocator, - *table, - extend_size)); - } - for (int64_t i = 0; OB_SUCC(ret) && i < extend_size - fetch_rows_.count(); ++i) { - OX (new (reinterpret_cast(table->get_data()) + i) ObObj(ObMaxType)); - } + ObExprPLAssocIndex::Info info; + OX (info.for_write_ = true); OX (element_type.set_meta(desc->type_.get_meta_type())); OX (element_type.set_accuracy(desc->type_.get_accuracy())); for (int64_t i = 0; OB_SUCC(ret) && i < fetch_rows_.count(); ++i) { - // check `table->get_count() > desc->cur_idx_` - // prevent the `table->get_data()[desc->cur_idx_]` is invalid - - /* when lower_bnd_ > 1 - * cur_idx_ will begin from lower_bnd_ - 1, the key is desc->cur_idx_ + 1 - * else - * cur_id_ will begin from 0, he key is desc->lower_bnd_ + desc->cur_idx_ - */ - if (table->get_count() > desc->cur_idx_) { - ObNewRow &row = fetch_rows_.at(i); - ObObjParam src = row.get_cell(col_idx); - OZ (ObSPIService::spi_convert(session, table->get_allocator(), src, element_type, obj)); - OZ (deep_copy_obj(*table->get_allocator(), - obj, - reinterpret_cast(table->get_data())[desc->cur_idx_])); - OX (key.set_int32(desc->lower_bnd_ > 1 ? desc->cur_idx_ + 1 : desc->lower_bnd_ + desc->cur_idx_)); - CK (OB_NOT_NULL(table->get_key())); - CK (OB_NOT_NULL(table->get_sort())); - OX (table->set_key(desc->cur_idx_, key)); - OX (table->set_sort(desc->cur_idx_, desc->cur_idx_ + 1)); - LOG_DEBUG("column add key ", K(key.get_int32()), K(desc->cur_idx_), K(table->get_key(desc->cur_idx_)->get_int32()), K(table->get_sort(desc->cur_idx_))); - OX (++desc->cur_idx_); - } + int64_t index = OB_INVALID_INDEX; + ObNewRow &row = fetch_rows_.at(i); + ObObjParam src = row.get_cell(col_idx); + OX (key.set_int32(desc->lower_bnd_ + desc->cur_idx_)); + OZ (ObExprPLAssocIndex::do_eval_assoc_index(index, session, info, *table, key, *allocator)); + CK(table->get_count() >= index); + OZ (ObSPIService::spi_convert(session, table->get_allocator(), src, element_type, obj)); + OZ (deep_copy_obj(*table->get_allocator(), + obj, + reinterpret_cast(table->get_data())[index-1])); + LOG_DEBUG("column add key ", K(col_idx), K(index), K(desc->cur_idx_), K(desc->lower_bnd_), K(key.get_int32()), K(table->get_key(index-1)->get_int32())); + OX (++desc->cur_idx_); } - - OX (table->set_first(reset_mode ? desc->lower_bnd_ > 1 ? desc->lower_bnd_ : 1 - : table->get_first())); - OX (table->set_last(table->get_first() - 1 + table->get_actual_count())); - LOG_DEBUG("column value set last and first", K(reset_mode), K(desc->lower_bnd_), K(table->get_first()), K(table->get_actual_count()), K(table->get_last())); + LOG_DEBUG("column value set last and first", K(desc->lower_bnd_), K(table->get_first()), K(table->get_actual_count()), K(table->get_last())); } else { ret = OB_ERR_INVALID_TYPE_FOR_ARGUMENT; LOG_WARN("type of out argument must match type of column or bind variable", @@ -1141,7 +1105,7 @@ int ObPLDbmsSql::execute(ObExecContext &exec_ctx, ParamStore ¶ms, ObObj &res OB_SUCC(ret) && iter != cursor->get_define_arrays().end(); ++iter) { ObDbmsCursorInfo::ArrayDesc &array_info = iter->second; - array_info.cur_idx_ = array_info.lower_bnd_ > 1 ? array_info.lower_bnd_ - 1 : 0; + array_info.cur_idx_ = 0; } } else { number::ObNumber num; @@ -1569,10 +1533,12 @@ int ObPLDbmsSql::execute_and_fetch(ObExecContext &exec_ctx, ParamStore ¶ms, OB_SUCC(ret) && iter != cursor->get_define_arrays().end(); ++iter) { ObDbmsCursorInfo::ArrayDesc &array_info = iter->second; - array_info.cur_idx_ = array_info.lower_bnd_ > 1 ? array_info.lower_bnd_ - 1 : 0; + array_info.cur_idx_ = 0; } } - if (OB_SUCC(ret) && !cursor->isopen() + // todo : when dbms_cursor support stream cursor need change here + if (OB_SUCC(ret) && (!cursor->isopen() + || (cursor->isopen() && cursor->get_rowcount() == cursor->get_spi_cursor()->cur_)) && !check_stmt_need_to_be_executed_when_parsing(*cursor)) { // do execute. has_open = false; diff --git a/src/sql/engine/expr/ob_expr_pl_associative_index.cpp b/src/sql/engine/expr/ob_expr_pl_associative_index.cpp index 7203e3864b..08beb506cd 100644 --- a/src/sql/engine/expr/ob_expr_pl_associative_index.cpp +++ b/src/sql/engine/expr/ob_expr_pl_associative_index.cpp @@ -78,6 +78,21 @@ int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx, const Info &info, pl::ObPLAssocArray &assoc_array_ref, const common::ObObj &key) +{ + return do_eval_assoc_index(assoc_idx, + exec_ctx.get_my_session(), + info, + assoc_array_ref, + key, + exec_ctx.get_allocator()); +} + +int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx, + ObSQLSessionInfo *session, + const Info &info, + pl::ObPLAssocArray &assoc_array_ref, + const common::ObObj &key, + ObIAllocator &allocator) { int ret = OB_SUCCESS; pl::ObPLAssocArray *assoc_array = &assoc_array_ref; @@ -110,9 +125,9 @@ int ObExprPLAssocIndex::do_eval_assoc_index(int64_t &assoc_idx, if (OB_SUCC(ret)) { if (info.for_write_) { if (OB_INVALID_INDEX == index) { - pl::ObPLExecCtx *pl_exec_ctx = exec_ctx.get_my_session()->get_pl_context()->get_current_ctx(); - if (OB_FAIL(ObSPIService::spi_extend_assoc_array(exec_ctx.get_my_session()->get_effective_tenant_id(), - pl_exec_ctx, exec_ctx.get_allocator(), *assoc_array, 1))) { + pl::ObPLExecCtx *pl_exec_ctx = session->get_pl_context()->get_current_ctx(); + if (OB_FAIL(ObSPIService::spi_extend_assoc_array(session->get_effective_tenant_id(), + pl_exec_ctx, allocator, *assoc_array, 1))) { LOG_WARN("failed to spi_set_collection_data", K(*assoc_array), K(ret)); } } diff --git a/src/sql/engine/expr/ob_expr_pl_associative_index.h b/src/sql/engine/expr/ob_expr_pl_associative_index.h index 377e3c7fdc..1e0bc687b7 100644 --- a/src/sql/engine/expr/ob_expr_pl_associative_index.h +++ b/src/sql/engine/expr/ob_expr_pl_associative_index.h @@ -52,7 +52,7 @@ public: virtual int cg_expr(ObExprCGCtx &op_cg_ctx, const ObRawExpr &raw_expr, ObExpr &rt_expr) const override; static int eval_assoc_idx(const ObExpr &expr, ObEvalCtx &ctx, ObDatum &expr_datum); -private: + struct Info { Info() : for_write_(false), @@ -83,11 +83,17 @@ private: const Info &info, pl::ObPLAssocArray &assoc_array, const common::ObObj &key); + static int do_eval_assoc_index(int64_t &assoc_idx, + ObSQLSessionInfo *session, + const Info &info, + pl::ObPLAssocArray &assoc_array_ref, + const common::ObObj &key, + ObIAllocator &allocator); static int reserve_assoc_key(pl::ObPLAssocArray &assoc_array); #endif DISALLOW_COPY_AND_ASSIGN(ObExprPLAssocIndex); -private: + Info info_; };