diff --git a/src/pl/ob_pl_code_generator.cpp b/src/pl/ob_pl_code_generator.cpp index f62ad79e94..951251a6b9 100644 --- a/src/pl/ob_pl_code_generator.cpp +++ b/src/pl/ob_pl_code_generator.cpp @@ -4021,6 +4021,7 @@ int ObPLCodeGenerator::init_spi_service() OZ (arg_types.push_back(int_pointer_type));//formal_param_idxs OZ (arg_types.push_back(int_pointer_type));//actual_param_exprs OZ (arg_types.push_back(int64_type));//cursor_param_count + OZ (arg_types.push_back(bool_type));//skip_locked OZ (ObLLVMFunctionType::get(int32_type, arg_types, ft)); OZ (helper_.create_function(ObString("spi_cursor_open"), ft, spi_service_.spi_cursor_open_)); } @@ -4909,10 +4910,11 @@ int ObPLCodeGenerator::generate_open( ObLLVMValue formal_params; ObLLVMValue actual_params; ObLLVMValue cursor_param_count; + ObLLVMValue skip_locked; ObLLVMValue ret_err; OZ (args.push_back(get_vars().at(CTX_IDX))); OZ (generate_sql(cursor_sql, str, len, ps_sql, type, for_update, hidden_rowid, sql_params, - sql_param_count)); + sql_param_count, skip_locked)); OZ (args.push_back(str)); OZ (args.push_back(ps_sql)); OZ (args.push_back(type)); @@ -4944,6 +4946,7 @@ int ObPLCodeGenerator::generate_open( OZ (args.push_back(formal_params)); OZ (args.push_back(actual_params)); OZ (args.push_back(cursor_param_count)); + OZ (args.push_back(skip_locked)); OZ (get_helper().create_call(ObString("spi_cursor_open"), get_spi_service().spi_cursor_open_, args, ret_err)); OZ (check_success(ret_err, s.get_stmt_id(), s.get_block()->in_notfound(), s.get_block()->in_warning())); } @@ -5568,8 +5571,9 @@ int ObPLCodeGenerator::generate_sql(const ObPLSqlStmt &s, ObLLVMValue &ret_err) ObLLVMValue hidden_rowid; ObLLVMValue params; ObLLVMValue count, is_type_record; + ObLLVMValue skip_locked; OZ (args.push_back(get_vars().at(CTX_IDX))); - OZ (generate_sql(s, str, len, ps_sql, type, for_update, hidden_rowid, params, count)); + OZ (generate_sql(s, str, len, ps_sql, type, for_update, hidden_rowid, params, count, skip_locked)); if (OB_SUCC(ret)) { if (s.get_params().empty()) { OZ (args.push_back(str)); @@ -6786,7 +6790,8 @@ int ObPLCodeGenerator::generate_sql(const ObPLSql &sql, jit::ObLLVMValue &for_update, jit::ObLLVMValue &hidden_rowid, jit::ObLLVMValue ¶ms, - jit::ObLLVMValue &count) + jit::ObLLVMValue &count, + jit::ObLLVMValue &skip_locked) { int ret = OB_SUCCESS; ObLLVMValue int_value; @@ -6812,6 +6817,8 @@ int ObPLCodeGenerator::generate_sql(const ObPLSql &sql, } else if (OB_FAIL(generate_expression_array( sql.is_forall_sql() ? sql.get_array_binding_params() : sql.get_params(), params, count))) { LOG_WARN("get precalc expr array ir value failed", K(ret)); + } else if (OB_FAIL(helper_.get_int8(sql.is_skip_locked(), skip_locked))) { + LOG_WARN("failed to get int8", K(ret)); } else { /*do nothing*/ } } return ret; diff --git a/src/pl/ob_pl_code_generator.h b/src/pl/ob_pl_code_generator.h index 483d048bac..9bd0899afd 100644 --- a/src/pl/ob_pl_code_generator.h +++ b/src/pl/ob_pl_code_generator.h @@ -225,7 +225,8 @@ public: jit::ObLLVMValue &for_update, jit::ObLLVMValue &hidden_rowid, jit::ObLLVMValue ¶ms, - jit::ObLLVMValue &count); + jit::ObLLVMValue &count, + jit::ObLLVMValue &skip_locked); int generate_into(const ObPLInto &into, jit::ObLLVMValue &into_array_value, jit::ObLLVMValue &into_count_value, diff --git a/src/pl/ob_pl_resolver.cpp b/src/pl/ob_pl_resolver.cpp index fe6576f6af..84fcb2fa99 100644 --- a/src/pl/ob_pl_resolver.cpp +++ b/src/pl/ob_pl_resolver.cpp @@ -5682,6 +5682,7 @@ int ObPLResolver::resolve_static_sql(const ObStmtNodeTree *parse_tree, ObPLSql & static_sql.set_for_update(prepare_result.for_update_); static_sql.set_hidden_rowid(prepare_result.has_hidden_rowid_); static_sql.set_link_table(prepare_result.has_link_table_); + static_sql.set_skip_locked(prepare_result.is_skip_locked_); } } @@ -7572,7 +7573,8 @@ int ObPLResolver::resolve_cursor_def(const ObString &cursor_name, formal_params, ObPLCursor::DEFINED, prepare_result.has_dup_column_name_, - index))) { + index, + prepare_result.is_skip_locked_))) { LOG_WARN("failed to add cursor to symbol table", K(cursor_name), K(sql_node->str_value_), diff --git a/src/pl/ob_pl_stmt.cpp b/src/pl/ob_pl_stmt.cpp index a1ec27c886..29cfd31a1c 100644 --- a/src/pl/ob_pl_stmt.cpp +++ b/src/pl/ob_pl_stmt.cpp @@ -316,7 +316,8 @@ int ObPLCursorTable::add_cursor(uint64_t pkg_id, const ObPLDataType& cursor_type, // cursor返回值类型(record) const common::ObIArray &formal_params, //cursor的形参 ObPLCursor::CursorState state, - bool has_dup_column_name) + bool has_dup_column_name, + bool skip_locked) { int ret = OB_SUCCESS; // CK (OB_LIKELY(cursors_.count() < FUNC_MAX_CURSORS)); @@ -339,6 +340,7 @@ int ObPLCursorTable::add_cursor(uint64_t pkg_id, cursor->set_cursor_type(cursor_type); cursor->set_state(state); cursor->set_rowid_table_id(rowid_table_id); + cursor->set_skip_locked(skip_locked); if (has_dup_column_name) { cursor->set_dup_column(); } @@ -1138,7 +1140,8 @@ int ObPLBlockNS::add_cursor(const ObString &name, const common::ObIArray &formal_params, ObPLCursor::CursorState state, bool has_dup_column_name, - int64_t &index) + int64_t &index, + bool skip_locked) { int ret = OB_SUCCESS; bool is_dup = false; @@ -1170,7 +1173,8 @@ int ObPLBlockNS::add_cursor(const ObString &name, cursor_type, formal_params, state, - has_dup_column_name))) { + has_dup_column_name, + skip_locked))) { LOG_WARN("failed to add condition to condition table", K(ret)); } else { index = cursors_.at(cursors_.count() - 1); diff --git a/src/pl/ob_pl_stmt.h b/src/pl/ob_pl_stmt.h index 87fa020ef4..89ad9f82dd 100644 --- a/src/pl/ob_pl_stmt.h +++ b/src/pl/ob_pl_stmt.h @@ -475,7 +475,8 @@ public: row_desc_(NULL), rowid_table_id_(OB_INVALID_ID), ps_sql_(), - is_link_table_(false) {} + is_link_table_(false), + is_skip_locked_(false) {} virtual ~ObPLSql() {} inline const common::ObString &get_sql() const { return sql_; } @@ -506,7 +507,10 @@ public: inline void set_link_table(bool is_link_table) { is_link_table_ = is_link_table; } inline bool has_link_table() const { return is_link_table_; } - TO_STRING_KV(K_(sql), K_(params), K_(ps_sql), K_(stmt_type), K_(ref_objects), K_(rowid_table_id)); + inline void set_skip_locked(bool is_skip_locked) { is_skip_locked_ = is_skip_locked; } + inline bool is_skip_locked() const { return is_skip_locked_; } + + TO_STRING_KV(K_(sql), K_(params), K_(ps_sql), K_(stmt_type), K_(ref_objects), K_(rowid_table_id), K_(is_skip_locked)); protected: bool forall_sql_; @@ -521,6 +525,7 @@ protected: uint64_t rowid_table_id_; common::ObString ps_sql_; bool is_link_table_; + bool is_skip_locked_; }; class ObPLCursor @@ -572,6 +577,8 @@ public: inline bool is_for_update() const { return value_.is_for_update(); } inline void set_hidden_rowid(bool has_hidden_rowid) { value_.set_hidden_rowid(has_hidden_rowid); } inline bool has_hidden_rowid() const { return value_.has_hidden_rowid(); } + inline void set_skip_locked(bool is_skip_locked) { value_.set_skip_locked(is_skip_locked); } + inline bool is_skip_locked() { return value_.is_skip_locked(); } inline const common::ObIArray &get_ref_objects() const { return value_.get_ref_objects(); } inline int set_ref_objects(const common::ObIArray &ref_objects) { return value_.set_ref_objects(ref_objects); } inline void set_row_desc(const ObRecordType* row_desc) { value_.set_row_desc(row_desc); } @@ -644,7 +651,8 @@ public: const ObPLDataType &cursor_type, const common::ObIArray &formal_params, ObPLCursor::CursorState state = ObPLCursor::DEFINED, - bool has_dup_column_name = false); + bool has_dup_column_name = false, + bool skip_locked = false); TO_STRING_KV(K_(cursors)); @@ -1351,7 +1359,8 @@ public: const common::ObIArray &formal_params, ObPLCursor::CursorState state, bool has_dup_column_name, - int64_t &index); + int64_t &index, + bool skip_locked = false); int add_questionmark_cursor(const int64_t symbol_idx); inline const common::ObIArray *get_exprs() const { return exprs_; } inline void set_exprs(common::ObIArray *exprs) { exprs_ = exprs; } diff --git a/src/pl/sys_package/ob_dbms_sql.cpp b/src/pl/sys_package/ob_dbms_sql.cpp index 9b60dc41ba..3424242270 100644 --- a/src/pl/sys_package/ob_dbms_sql.cpp +++ b/src/pl/sys_package/ob_dbms_sql.cpp @@ -847,6 +847,7 @@ int ObPLDbmsSql::do_parse(ObExecContext &exec_ctx, ObDbmsCursorInfo *cursor, ObS bool for_update = false; bool hidden_rowid = false; int64_t into_cnt = 0; + bool skip_locked = false; ParamStore dummy_params; ObSqlString sql_str; ObPLExecCtx pl_ctx(cursor->get_allocator(), &exec_ctx, &dummy_params, @@ -865,6 +866,7 @@ int ObPLDbmsSql::do_parse(ObExecContext &exec_ctx, ObDbmsCursorInfo *cursor, ObS for_update, hidden_rowid, into_cnt, + skip_locked, &cursor->get_field_columns())); if (OB_SUCC(ret)) { cursor->set_ps_sql(ps_sql); diff --git a/src/sql/ob_result_set.cpp b/src/sql/ob_result_set.cpp index dd9fd8501f..ed63f76782 100644 --- a/src/sql/ob_result_set.cpp +++ b/src/sql/ob_result_set.cpp @@ -1249,6 +1249,7 @@ int ObResultSet::ExternalRetrieveInfo::build_into_exprs( } is_select_for_update_ = (static_cast(stmt)).has_for_update(); has_hidden_rowid_ = (static_cast(stmt)).has_hidden_rowid(); + is_skip_locked_ = (static_cast(stmt)).is_skip_locked(); } else if (stmt.is_insert_stmt() || stmt.is_update_stmt() || stmt.is_delete_stmt()) { ObDelUpdStmt &dml_stmt = static_cast(stmt); OZ (into_exprs_.assign(dml_stmt.get_returning_into_exprs())); diff --git a/src/sql/ob_result_set.h b/src/sql/ob_result_set.h index 0d6e42f694..8ea65b286f 100644 --- a/src/sql/ob_result_set.h +++ b/src/sql/ob_result_set.h @@ -75,7 +75,8 @@ public: has_hidden_rowid_(false), stmt_sql_(), is_bulk_(false), - has_link_table_(false) {} + has_link_table_(false), + is_skip_locked_(false) {} virtual ~ExternalRetrieveInfo() {} int build(ObStmt &stmt, @@ -98,6 +99,7 @@ public: ObString stmt_sql_; bool is_bulk_; bool has_link_table_; + bool is_skip_locked_; }; enum PsMode @@ -173,6 +175,7 @@ public: inline bool has_hidden_rowid(); inline bool is_bulk(); inline bool is_link_table(); + inline bool is_skip_locked(); /// whether the result is with rows (true for SELECT statement) bool is_with_rows() const; // tell mysql if need to do async end trans @@ -644,6 +647,11 @@ inline bool ObResultSet::is_link_table() return external_retrieve_info_.has_link_table_; } +inline bool ObResultSet::is_skip_locked() +{ + return external_retrieve_info_.is_skip_locked_; +} + inline bool ObResultSet::is_with_rows() const { return (p_field_columns_->count() > 0 && !is_prepare_stmt()); diff --git a/src/sql/ob_spi.cpp b/src/sql/ob_spi.cpp index 9ebf8164a9..8f77482aff 100644 --- a/src/sql/ob_spi.cpp +++ b/src/sql/ob_spi.cpp @@ -2316,6 +2316,7 @@ int ObSPIService::spi_resolve_prepare(common::ObIAllocator &allocator, prepare_result.has_hidden_rowid_ = pl_prepare_result.result_set_->has_hidden_rowid(); prepare_result.is_bulk_ = pl_prepare_result.result_set_->is_bulk(); prepare_result.has_link_table_ = pl_prepare_result.result_set_->is_link_table(); + prepare_result.is_skip_locked_ = pl_prepare_result.result_set_->is_skip_locked(); if (OB_FAIL(ret)) { } else if (OB_NOT_NULL(prepare_result.record_type_)) { if (stmt::T_SELECT != prepare_result.type_) { @@ -2454,12 +2455,13 @@ int ObSPIService::prepare_dynamic(ObPLExecCtx *ctx, stmt::StmtType &stmt_type, bool &for_update, bool &hidden_rowid, - int64_t &into_cnt) + int64_t &into_cnt, + bool &skip_locked) { int ret = OB_SUCCESS; OZ (calc_dynamic_sqlstr(ctx, sql_expr, sql_str)); OZ (prepare_dynamic(ctx, allocator, is_returning, false, param_cnt, sql_str, - ps_sql, stmt_type, for_update, hidden_rowid, into_cnt)); + ps_sql, stmt_type, for_update, hidden_rowid, into_cnt, skip_locked)); return ret; } @@ -2474,6 +2476,7 @@ int ObSPIService::prepare_dynamic(ObPLExecCtx *ctx, bool &for_update, bool &hidden_rowid, int64_t &into_cnt, + bool &skip_locked, common::ColumnsFieldArray *field_list) { int ret = OB_SUCCESS; @@ -2498,6 +2501,7 @@ int ObSPIService::prepare_dynamic(ObPLExecCtx *ctx, OX (for_update = pl_prepare_result.result_set_->get_is_select_for_update()); OX (hidden_rowid = pl_prepare_result.result_set_->has_hidden_rowid()); OX (into_cnt = pl_prepare_result.result_set_->get_into_exprs().count()); + OX (skip_locked = pl_prepare_result.result_set_->is_skip_locked()); if (OB_SUCC(ret) && NULL != field_list) { CK (OB_NOT_NULL(pl_prepare_result.result_set_->get_field_columns())); @@ -2656,6 +2660,7 @@ int ObSPIService::spi_execute_immediate(ObPLExecCtx *ctx, ObString ps_sql; bool for_update = false; bool hidden_rowid = false; + bool skip_locked = false; ObQueryRetryCtrl retry_ctrl; int64_t tenant_version = 0; int64_t sys_version = 0; @@ -2684,7 +2689,8 @@ int ObSPIService::spi_execute_immediate(ObPLExecCtx *ctx, stmt_type, for_update, hidden_rowid, - inner_into_cnt)); + inner_into_cnt, + skip_locked)); if (OB_SUCC(ret)) { if (ObStmt::is_ddl_stmt(stmt_type, false) && (into_count > 0 || param_count > 0 || is_returning)) { @@ -3374,6 +3380,7 @@ int ObSPIService::spi_dynamic_open(ObPLExecCtx *ctx, bool for_update = false; bool hidden_rowid = false; int64_t inner_into_cnt = 0; + bool skip_locked = false; OZ (prepare_dynamic(ctx, sql, @@ -3385,7 +3392,8 @@ int ObSPIService::spi_dynamic_open(ObPLExecCtx *ctx, stmt_type, for_update, hidden_rowid, - inner_into_cnt)); + inner_into_cnt, + skip_locked)); OZ (spi_cursor_open(ctx, sql_param_count > 0 ? NULL : sql_str.ptr(), ps_sql.ptr(),//trans to c-stype @@ -3399,7 +3407,8 @@ int ObSPIService::spi_dynamic_open(ObPLExecCtx *ctx, cursor_index, NULL/*formal_param_idxs*/, NULL/*actual_param_exprs*/, - 0/*cursor_param_count*/)); + 0/*cursor_param_count*/, + skip_locked)); return ret; } @@ -3486,7 +3495,8 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx, int64_t cursor_index, const int64_t *formal_param_idxs, const ObSqlExpression **actual_param_exprs, - int64_t cursor_param_count) + int64_t cursor_param_count, + bool skip_locked) { int ret = OB_SUCCESS; ObSQLSessionInfo *session_info = NULL; @@ -3554,8 +3564,8 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx, if (OB_FAIL(ret)) { // do nothing } else if (lib::is_oracle_mode() - && !for_update - && ((is_server_cursor && use_stream) || !is_server_cursor)) { + && ((is_server_cursor && use_stream) || !is_server_cursor) + && (!for_update || (for_update && skip_locked))) { cursor->set_streaming(); ObSPIResultSet *spi_result = NULL; if (OB_FAIL(ret)) { @@ -3572,7 +3582,7 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx, // 如果当前cursor已经有spi_result则复用,避免内存占用过多 retry_ctrl.clear_state_before_each_retry(session_info->get_retry_info_for_update()); OZ (cursor->prepare_spi_result(ctx, spi_result)); - OZ (spi_result->start_cursor_stmt(ctx, static_cast(type), false)); + OZ (spi_result->start_cursor_stmt(ctx, static_cast(type), for_update)); OZ ((GCTX.schema_service_->get_tenant_schema_guard(session_info->get_effective_tenant_id(), spi_result->get_scheme_guard()))); OX (spi_result->get_sql_ctx().schema_guard_ = &spi_result->get_scheme_guard()); OZ (spi_result->get_scheme_guard().get_schema_version(session_info->get_effective_tenant_id(), tenant_version)); @@ -3617,7 +3627,9 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx, //如果是客户端游标,设置结果集为二进制模式 OX (spi_result->get_result_set()->set_ps_protocol()); } - + OX (for_update ? cursor->set_for_update() : (void)NULL); + OX (for_update ? cursor->set_trans_id(session_info->get_tx_id()) : (void)NULL); + OX (has_hidden_rowid ? cursor->set_hidden_rowid() : (void)NULL); if (OB_SUCC(ret)) { transaction::ObTxReadSnapshot &snapshot = spi_result->get_result_set()->get_exec_context().get_das_ctx().get_snapshot(); @@ -6933,7 +6945,7 @@ int ObSPIService::get_result(ObPLExecCtx *ctx, LOG_WARN("invalid argument", K(ret)); } else { column_count = fields->count(); - actual_column_count = column_count; + actual_column_count = column_count - hidden_column_count; } for (int64_t i = 0; OB_SUCC(ret) && i < actual_column_count; ++i) { ObDataType type; diff --git a/src/sql/ob_spi.h b/src/sql/ob_spi.h index 8f32f8a5ed..66bba08fcb 100644 --- a/src/sql/ob_spi.h +++ b/src/sql/ob_spi.h @@ -257,7 +257,8 @@ public: ps_sql_(), is_bulk_(false), has_dup_column_name_(false), - has_link_table_(false) + has_link_table_(false), + is_skip_locked_(false) {} stmt::StmtType type_; //prepare的语句类型 bool for_update_; @@ -273,6 +274,7 @@ public: bool is_bulk_; bool has_dup_column_name_; bool has_link_table_; + bool is_skip_locked_; }; struct PLPrepareCtx @@ -499,7 +501,8 @@ public: int64_t cursor_index, const int64_t *formal_param_idxs, const ObSqlExpression **actual_param_exprs, - int64_t cursor_param_count); + int64_t cursor_param_count, + bool skip_locked); static int dbms_cursor_open(pl::ObPLExecCtx *ctx, pl::ObDbmsCursorInfo &cursor, const ObString &ps_sql, @@ -696,7 +699,8 @@ public: stmt::StmtType &type, bool &for_update, bool &hidden_rowid, - int64_t &into_cnt); + int64_t &into_cnt, + bool &skip_locked); static int prepare_dynamic(pl::ObPLExecCtx *ctx, ObIAllocator &allocator, bool is_returning, @@ -708,6 +712,7 @@ public: bool &for_update, bool &hidden_rowid, int64_t &into_cnt, + bool &skip_locked, common::ColumnsFieldArray *field_list = NULL); static int force_refresh_schema(uint64_t tenant_id); diff --git a/src/sql/resolver/dml/ob_select_stmt.cpp b/src/sql/resolver/dml/ob_select_stmt.cpp index 33d4033803..aa4256ca8d 100644 --- a/src/sql/resolver/dml/ob_select_stmt.cpp +++ b/src/sql/resolver/dml/ob_select_stmt.cpp @@ -791,6 +791,18 @@ bool ObSelectStmt::has_for_update() const return bret; } +bool ObSelectStmt::is_skip_locked() const +{ + bool bret = false; + for (int64_t i = 0; !bret && i < table_items_.count(); ++i) { + const TableItem *table_item = table_items_.at(i); + if (table_item != NULL && table_item->skip_locked_) { + bret = true; + } + } + return bret; +} + int ObSelectStmt::clear_sharable_expr_reference() { int ret = OB_SUCCESS; diff --git a/src/sql/resolver/dml/ob_select_stmt.h b/src/sql/resolver/dml/ob_select_stmt.h index 8abe5022c3..f893bb9c53 100644 --- a/src/sql/resolver/dml/ob_select_stmt.h +++ b/src/sql/resolver/dml/ob_select_stmt.h @@ -511,6 +511,7 @@ public: } int add_having_expr(ObRawExpr *expr) { return having_exprs_.push_back(expr); } bool has_for_update() const; + bool is_skip_locked() const; common::ObIArray &get_for_update_columns() { return for_update_columns_; } const common::ObIArray &get_for_update_columns() const { return for_update_columns_; } bool contain_ab_param() const { return contain_ab_param_; }