[#54380548]change for update skip locked cursor to stream mode

This commit is contained in:
seuwebber
2024-02-09 12:37:20 +00:00
committed by ob-robot
parent 53291d066b
commit d0fea03ca8
12 changed files with 91 additions and 27 deletions

View File

@ -1249,6 +1249,7 @@ int ObResultSet::ExternalRetrieveInfo::build_into_exprs(
}
is_select_for_update_ = (static_cast<ObSelectStmt&>(stmt)).has_for_update();
has_hidden_rowid_ = (static_cast<ObSelectStmt&>(stmt)).has_hidden_rowid();
is_skip_locked_ = (static_cast<ObSelectStmt&>(stmt)).is_skip_locked();
} else if (stmt.is_insert_stmt() || stmt.is_update_stmt() || stmt.is_delete_stmt()) {
ObDelUpdStmt &dml_stmt = static_cast<ObDelUpdStmt&>(stmt);
OZ (into_exprs_.assign(dml_stmt.get_returning_into_exprs()));

View File

@ -75,7 +75,8 @@ public:
has_hidden_rowid_(false),
stmt_sql_(),
is_bulk_(false),
has_link_table_(false) {}
has_link_table_(false),
is_skip_locked_(false) {}
virtual ~ExternalRetrieveInfo() {}
int build(ObStmt &stmt,
@ -98,6 +99,7 @@ public:
ObString stmt_sql_;
bool is_bulk_;
bool has_link_table_;
bool is_skip_locked_;
};
enum PsMode
@ -173,6 +175,7 @@ public:
inline bool has_hidden_rowid();
inline bool is_bulk();
inline bool is_link_table();
inline bool is_skip_locked();
/// whether the result is with rows (true for SELECT statement)
bool is_with_rows() const;
// tell mysql if need to do async end trans
@ -644,6 +647,11 @@ inline bool ObResultSet::is_link_table()
return external_retrieve_info_.has_link_table_;
}
inline bool ObResultSet::is_skip_locked()
{
return external_retrieve_info_.is_skip_locked_;
}
inline bool ObResultSet::is_with_rows() const
{
return (p_field_columns_->count() > 0 && !is_prepare_stmt());

View File

@ -2316,6 +2316,7 @@ int ObSPIService::spi_resolve_prepare(common::ObIAllocator &allocator,
prepare_result.has_hidden_rowid_ = pl_prepare_result.result_set_->has_hidden_rowid();
prepare_result.is_bulk_ = pl_prepare_result.result_set_->is_bulk();
prepare_result.has_link_table_ = pl_prepare_result.result_set_->is_link_table();
prepare_result.is_skip_locked_ = pl_prepare_result.result_set_->is_skip_locked();
if (OB_FAIL(ret)) {
} else if (OB_NOT_NULL(prepare_result.record_type_)) {
if (stmt::T_SELECT != prepare_result.type_) {
@ -2454,12 +2455,13 @@ int ObSPIService::prepare_dynamic(ObPLExecCtx *ctx,
stmt::StmtType &stmt_type,
bool &for_update,
bool &hidden_rowid,
int64_t &into_cnt)
int64_t &into_cnt,
bool &skip_locked)
{
int ret = OB_SUCCESS;
OZ (calc_dynamic_sqlstr(ctx, sql_expr, sql_str));
OZ (prepare_dynamic(ctx, allocator, is_returning, false, param_cnt, sql_str,
ps_sql, stmt_type, for_update, hidden_rowid, into_cnt));
ps_sql, stmt_type, for_update, hidden_rowid, into_cnt, skip_locked));
return ret;
}
@ -2474,6 +2476,7 @@ int ObSPIService::prepare_dynamic(ObPLExecCtx *ctx,
bool &for_update,
bool &hidden_rowid,
int64_t &into_cnt,
bool &skip_locked,
common::ColumnsFieldArray *field_list)
{
int ret = OB_SUCCESS;
@ -2498,6 +2501,7 @@ int ObSPIService::prepare_dynamic(ObPLExecCtx *ctx,
OX (for_update = pl_prepare_result.result_set_->get_is_select_for_update());
OX (hidden_rowid = pl_prepare_result.result_set_->has_hidden_rowid());
OX (into_cnt = pl_prepare_result.result_set_->get_into_exprs().count());
OX (skip_locked = pl_prepare_result.result_set_->is_skip_locked());
if (OB_SUCC(ret) && NULL != field_list) {
CK (OB_NOT_NULL(pl_prepare_result.result_set_->get_field_columns()));
@ -2656,6 +2660,7 @@ int ObSPIService::spi_execute_immediate(ObPLExecCtx *ctx,
ObString ps_sql;
bool for_update = false;
bool hidden_rowid = false;
bool skip_locked = false;
ObQueryRetryCtrl retry_ctrl;
int64_t tenant_version = 0;
int64_t sys_version = 0;
@ -2684,7 +2689,8 @@ int ObSPIService::spi_execute_immediate(ObPLExecCtx *ctx,
stmt_type,
for_update,
hidden_rowid,
inner_into_cnt));
inner_into_cnt,
skip_locked));
if (OB_SUCC(ret)) {
if (ObStmt::is_ddl_stmt(stmt_type, false)
&& (into_count > 0 || param_count > 0 || is_returning)) {
@ -3374,6 +3380,7 @@ int ObSPIService::spi_dynamic_open(ObPLExecCtx *ctx,
bool for_update = false;
bool hidden_rowid = false;
int64_t inner_into_cnt = 0;
bool skip_locked = false;
OZ (prepare_dynamic(ctx,
sql,
@ -3385,7 +3392,8 @@ int ObSPIService::spi_dynamic_open(ObPLExecCtx *ctx,
stmt_type,
for_update,
hidden_rowid,
inner_into_cnt));
inner_into_cnt,
skip_locked));
OZ (spi_cursor_open(ctx,
sql_param_count > 0 ? NULL : sql_str.ptr(),
ps_sql.ptr(),//trans to c-stype
@ -3399,7 +3407,8 @@ int ObSPIService::spi_dynamic_open(ObPLExecCtx *ctx,
cursor_index,
NULL/*formal_param_idxs*/,
NULL/*actual_param_exprs*/,
0/*cursor_param_count*/));
0/*cursor_param_count*/,
skip_locked));
return ret;
}
@ -3486,7 +3495,8 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx,
int64_t cursor_index,
const int64_t *formal_param_idxs,
const ObSqlExpression **actual_param_exprs,
int64_t cursor_param_count)
int64_t cursor_param_count,
bool skip_locked)
{
int ret = OB_SUCCESS;
ObSQLSessionInfo *session_info = NULL;
@ -3554,8 +3564,8 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx,
if (OB_FAIL(ret)) {
// do nothing
} else if (lib::is_oracle_mode()
&& !for_update
&& ((is_server_cursor && use_stream) || !is_server_cursor)) {
&& ((is_server_cursor && use_stream) || !is_server_cursor)
&& (!for_update || (for_update && skip_locked))) {
cursor->set_streaming();
ObSPIResultSet *spi_result = NULL;
if (OB_FAIL(ret)) {
@ -3572,7 +3582,7 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx,
// 如果当前cursor已经有spi_result则复用,避免内存占用过多
retry_ctrl.clear_state_before_each_retry(session_info->get_retry_info_for_update());
OZ (cursor->prepare_spi_result(ctx, spi_result));
OZ (spi_result->start_cursor_stmt(ctx, static_cast<stmt::StmtType>(type), false));
OZ (spi_result->start_cursor_stmt(ctx, static_cast<stmt::StmtType>(type), for_update));
OZ ((GCTX.schema_service_->get_tenant_schema_guard(session_info->get_effective_tenant_id(), spi_result->get_scheme_guard())));
OX (spi_result->get_sql_ctx().schema_guard_ = &spi_result->get_scheme_guard());
OZ (spi_result->get_scheme_guard().get_schema_version(session_info->get_effective_tenant_id(), tenant_version));
@ -3617,7 +3627,9 @@ int ObSPIService::spi_cursor_open(ObPLExecCtx *ctx,
//如果是客户端游标,设置结果集为二进制模式
OX (spi_result->get_result_set()->set_ps_protocol());
}
OX (for_update ? cursor->set_for_update() : (void)NULL);
OX (for_update ? cursor->set_trans_id(session_info->get_tx_id()) : (void)NULL);
OX (has_hidden_rowid ? cursor->set_hidden_rowid() : (void)NULL);
if (OB_SUCC(ret)) {
transaction::ObTxReadSnapshot &snapshot =
spi_result->get_result_set()->get_exec_context().get_das_ctx().get_snapshot();
@ -6933,7 +6945,7 @@ int ObSPIService::get_result(ObPLExecCtx *ctx,
LOG_WARN("invalid argument", K(ret));
} else {
column_count = fields->count();
actual_column_count = column_count;
actual_column_count = column_count - hidden_column_count;
}
for (int64_t i = 0; OB_SUCC(ret) && i < actual_column_count; ++i) {
ObDataType type;

View File

@ -257,7 +257,8 @@ public:
ps_sql_(),
is_bulk_(false),
has_dup_column_name_(false),
has_link_table_(false)
has_link_table_(false),
is_skip_locked_(false)
{}
stmt::StmtType type_; //prepare的语句类型
bool for_update_;
@ -273,6 +274,7 @@ public:
bool is_bulk_;
bool has_dup_column_name_;
bool has_link_table_;
bool is_skip_locked_;
};
struct PLPrepareCtx
@ -499,7 +501,8 @@ public:
int64_t cursor_index,
const int64_t *formal_param_idxs,
const ObSqlExpression **actual_param_exprs,
int64_t cursor_param_count);
int64_t cursor_param_count,
bool skip_locked);
static int dbms_cursor_open(pl::ObPLExecCtx *ctx,
pl::ObDbmsCursorInfo &cursor,
const ObString &ps_sql,
@ -696,7 +699,8 @@ public:
stmt::StmtType &type,
bool &for_update,
bool &hidden_rowid,
int64_t &into_cnt);
int64_t &into_cnt,
bool &skip_locked);
static int prepare_dynamic(pl::ObPLExecCtx *ctx,
ObIAllocator &allocator,
bool is_returning,
@ -708,6 +712,7 @@ public:
bool &for_update,
bool &hidden_rowid,
int64_t &into_cnt,
bool &skip_locked,
common::ColumnsFieldArray *field_list = NULL);
static int force_refresh_schema(uint64_t tenant_id);

View File

@ -791,6 +791,18 @@ bool ObSelectStmt::has_for_update() const
return bret;
}
bool ObSelectStmt::is_skip_locked() const
{
bool bret = false;
for (int64_t i = 0; !bret && i < table_items_.count(); ++i) {
const TableItem *table_item = table_items_.at(i);
if (table_item != NULL && table_item->skip_locked_) {
bret = true;
}
}
return bret;
}
int ObSelectStmt::clear_sharable_expr_reference()
{
int ret = OB_SUCCESS;

View File

@ -511,6 +511,7 @@ public:
}
int add_having_expr(ObRawExpr *expr) { return having_exprs_.push_back(expr); }
bool has_for_update() const;
bool is_skip_locked() const;
common::ObIArray<ObColumnRefRawExpr*> &get_for_update_columns() { return for_update_columns_; }
const common::ObIArray<ObColumnRefRawExpr *> &get_for_update_columns() const { return for_update_columns_; }
bool contain_ab_param() const { return contain_ab_param_; }