From dfbfe7050d5c1e7f078b949b28f16a6384146210 Mon Sep 17 00:00:00 2001 From: obdev Date: Sat, 28 Jan 2023 16:30:04 +0800 Subject: [PATCH] [FEAT MERGE] Merge foreign key refactor to master; --- src/observer/ob_inner_sql_result.cpp | 2 + .../table/ob_table_insert_up_executor.cpp | 9 +- .../table/ob_table_modify_executor.cpp | 10 +- src/share/ob_debug_sync_point.h | 1 + src/share/ob_i_tablet_scan.h | 2 + src/sql/das/ob_das_delete_op.cpp | 4 +- src/sql/das/ob_das_delete_op.h | 2 +- src/sql/das/ob_das_dml_ctx_define.cpp | 2 +- src/sql/das/ob_das_dml_ctx_define.h | 1 + src/sql/das/ob_das_insert_op.cpp | 4 +- src/sql/das/ob_das_insert_op.h | 2 +- src/sql/das/ob_das_lock_op.cpp | 4 +- src/sql/das/ob_das_lock_op.h | 2 +- src/sql/das/ob_das_scan_op.cpp | 18 +- src/sql/das/ob_das_scan_op.h | 4 +- src/sql/das/ob_das_update_op.cpp | 4 +- src/sql/das/ob_das_update_op.h | 2 +- src/sql/engine/dml/ob_dml_ctx_define.h | 29 ++- src/sql/engine/dml/ob_dml_service.cpp | 179 +++++++++++++----- src/sql/engine/dml/ob_dml_service.h | 24 ++- src/sql/engine/dml/ob_table_delete_op.cpp | 21 +- src/sql/engine/dml/ob_table_delete_op.h | 1 + src/sql/engine/dml/ob_table_insert_all_op.cpp | 38 +++- src/sql/engine/dml/ob_table_insert_all_op.h | 1 + src/sql/engine/dml/ob_table_insert_op.cpp | 12 +- src/sql/engine/dml/ob_table_insert_up_op.cpp | 83 +++++--- src/sql/engine/dml/ob_table_insert_up_op.h | 4 + src/sql/engine/dml/ob_table_lock_op.cpp | 12 +- src/sql/engine/dml/ob_table_merge_op.cpp | 25 +-- src/sql/engine/dml/ob_table_modify_op.cpp | 46 +++-- src/sql/engine/dml/ob_table_modify_op.h | 14 ++ src/sql/engine/dml/ob_table_replace_op.cpp | 40 ++-- src/sql/engine/dml/ob_table_update_op.cpp | 9 +- src/sql/engine/dml/ob_table_update_op.h | 1 + .../static/ob_px_multi_part_delete_op.cpp | 5 +- .../static/ob_px_multi_part_insert_op.cpp | 5 +- .../static/ob_px_multi_part_update_op.cpp | 5 +- src/sql/engine/table/ob_table_scan_op.cpp | 3 + src/sql/engine/table/ob_table_scan_op.h | 2 + src/storage/CMakeLists.txt | 1 + src/storage/access/ob_table_access_param.cpp | 3 + src/storage/access/ob_table_access_param.h | 1 + .../ob_micro_block_row_scanner.cpp | 65 ++++++- .../blocksstable/ob_micro_block_row_scanner.h | 4 + src/storage/memtable/mvcc/ob_mvcc_acc_ctx.h | 15 +- .../memtable/mvcc/ob_mvcc_iterator.cpp | 17 ++ src/storage/memtable/mvcc/ob_mvcc_iterator.h | 9 + src/storage/memtable/ob_memtable.cpp | 14 ++ src/storage/memtable/ob_memtable_iterator.cpp | 23 ++- .../memtable/ob_row_conflict_handler.cpp | 171 +++++++++++++++++ .../memtable/ob_row_conflict_handler.h | 70 +++++++ src/storage/tx/ob_trans_define_v4.cpp | 36 +++- src/storage/tx/ob_trans_define_v4.h | 2 + src/storage/tx_storage/ob_access_service.cpp | 20 ++ 54 files changed, 882 insertions(+), 201 deletions(-) create mode 100644 src/storage/memtable/ob_row_conflict_handler.cpp create mode 100644 src/storage/memtable/ob_row_conflict_handler.h diff --git a/src/observer/ob_inner_sql_result.cpp b/src/observer/ob_inner_sql_result.cpp index ff7662dc3f..dd9eb2320a 100644 --- a/src/observer/ob_inner_sql_result.cpp +++ b/src/observer/ob_inner_sql_result.cpp @@ -148,6 +148,8 @@ int ObInnerSQLResult::open() LOG_WARN("open result set failed", K(ret)); // move after precess_retry(). // result_set_->close(); + } else if (has_tenant_resource() && OB_FAIL(result_set_->get_exec_context().check_status())) { + LOG_WARN("failed check status", K(ret)); } else if (is_read_&& is_select) { //prefetch 1 row for throwing error code and retry opened_ = true; diff --git a/src/observer/table/ob_table_insert_up_executor.cpp b/src/observer/table/ob_table_insert_up_executor.cpp index 62633d0c08..ab9e1c0885 100644 --- a/src/observer/table/ob_table_insert_up_executor.cpp +++ b/src/observer/table/ob_table_insert_up_executor.cpp @@ -387,7 +387,7 @@ int ObTableApiInsertUpExecutor::delete_upd_old_row_to_das(const ObRowkey &constr LOG_WARN("fail to generate del rtdef for update", K(ret), K(upd_ctdef), K(upd_rtdef)); } } - + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(ret)) { //do nothing } else if (OB_ISNULL(upd_rtdef.ddel_rtdef_)) { @@ -397,7 +397,8 @@ int ObTableApiInsertUpExecutor::delete_upd_old_row_to_das(const ObRowkey &constr *upd_rtdef.ddel_rtdef_, tablet_loc, upd_rtctx_, - upd_ctdef.old_row_))) { + upd_ctdef.old_row_, + stored_row))) { LOG_WARN("fail to delete row with das", K(ret)); } } @@ -470,6 +471,7 @@ int ObTableApiInsertUpExecutor::insert_upd_new_row_to_das() } clear_evaluated_flag(); + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(ret)) { //do nothing } else if (OB_ISNULL(upd_rtdef.dins_rtdef_)) { @@ -479,7 +481,8 @@ int ObTableApiInsertUpExecutor::insert_upd_new_row_to_das() *upd_rtdef.dins_rtdef_, tablet_loc, upd_rtctx_, - upd_ctdef.new_row_))) { + upd_ctdef.new_row_, + stored_row))) { LOG_WARN("fail to insert row with das", K(ret)); } } diff --git a/src/observer/table/ob_table_modify_executor.cpp b/src/observer/table/ob_table_modify_executor.cpp index f5a3ffcf14..0754fdcac5 100644 --- a/src/observer/table/ob_table_modify_executor.cpp +++ b/src/observer/table/ob_table_modify_executor.cpp @@ -233,14 +233,15 @@ int ObTableApiModifyExecutor::insert_row_to_das(const ObTableInsCtDef &ins_ctdef { int ret = OB_SUCCESS; ObDASTabletLoc *tablet_loc = nullptr; - + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(calc_tablet_loc(tablet_loc))) { LOG_WARN("fail to calc partition key", K(ret)); } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef.das_ctdef_, ins_rtdef.das_rtdef_, tablet_loc, dml_rtctx_, - ins_ctdef.new_row_))) { + ins_ctdef.new_row_, + stored_row))) { LOG_WARN("fail to insert row by dml service", K(ret)); } @@ -252,7 +253,7 @@ int ObTableApiModifyExecutor::delete_row_to_das(const ObTableDelCtDef &del_ctdef { int ret = OB_SUCCESS; ObDASTabletLoc *tablet_loc = nullptr; - + ObChunkDatumStore::StoredRow* stored_row = nullptr; // todo:linjing check rowkey null and skip if (OB_FAIL(calc_tablet_loc(tablet_loc))) { LOG_WARN("fail tp calc tablet location", K(ret)); @@ -260,7 +261,8 @@ int ObTableApiModifyExecutor::delete_row_to_das(const ObTableDelCtDef &del_ctdef del_rtdef.das_rtdef_, tablet_loc, dml_rtctx_, - del_ctdef.old_row_))) { + del_ctdef.old_row_, + stored_row))) { LOG_WARN("fail to delete row to das op", K(ret), K(del_ctdef), K(del_rtdef)); } diff --git a/src/share/ob_debug_sync_point.h b/src/share/ob_debug_sync_point.h index fb1019e80e..2a97b5cecd 100644 --- a/src/share/ob_debug_sync_point.h +++ b/src/share/ob_debug_sync_point.h @@ -432,6 +432,7 @@ class ObString; ACT(ALTER_LS_CHOOSE_SRC,)\ ACT(BEFORE_LOCK_SERVICE_UNLOCK,)\ ACT(DDL_CHECK_TABLET_MERGE_STATUS,)\ + ACT(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK,)\ ACT(MODIFY_HIDDEN_TABLE_NOT_NULL_COLUMN_STATE_BEFORE_PUBLISH_SCHEMA,)\ ACT(AFTER_MIGRATION_FETCH_TABLET_INFO,)\ ACT(BEFORE_SET_LS_MEMBER_LIST,)\ diff --git a/src/share/ob_i_tablet_scan.h b/src/share/ob_i_tablet_scan.h index fdb0455551..36db634c93 100644 --- a/src/share/ob_i_tablet_scan.h +++ b/src/share/ob_i_tablet_scan.h @@ -210,6 +210,7 @@ ObVTableScanParam() : fb_snapshot_(), is_get_(false), force_refresh_lc_(false), + is_for_foreign_check_(false), output_exprs_(NULL), aggregate_exprs_(NULL), op_(NULL), @@ -273,6 +274,7 @@ ObVTableScanParam() : share::SCN fb_snapshot_; bool is_get_; bool force_refresh_lc_; + bool is_for_foreign_check_; // // for static typing engine, set to NULL if the old engine is used diff --git a/src/sql/das/ob_das_delete_op.cpp b/src/sql/das/ob_das_delete_op.cpp index b0c976c7b7..9fb3d11310 100644 --- a/src/sql/das/ob_das_delete_op.cpp +++ b/src/sql/das/ob_das_delete_op.cpp @@ -155,12 +155,12 @@ int ObDASDeleteOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASDeleteOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full) +int ObDASDeleteOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, added, true))) { + if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to datum store failed", K(ret), K(row), K(write_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_delete_op.h b/src/sql/das/ob_das_delete_op.h index 8438c9a632..79bb6345fe 100644 --- a/src/sql/das/ob_das_delete_op.h +++ b/src/sql/das/ob_das_delete_op.h @@ -35,7 +35,7 @@ public: virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return del_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return del_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full); + int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); int64_t get_row_cnt() const { return write_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASDelCtDef *del_ctdef) { del_ctdef_ = del_ctdef; } void set_das_rtdef(ObDASDelRtDef *del_rtdef) { del_rtdef_ = del_rtdef; } diff --git a/src/sql/das/ob_das_dml_ctx_define.cpp b/src/sql/das/ob_das_dml_ctx_define.cpp index 6c74d3b62d..4ff10d25ef 100644 --- a/src/sql/das/ob_das_dml_ctx_define.cpp +++ b/src/sql/das/ob_das_dml_ctx_define.cpp @@ -365,6 +365,7 @@ int ObDASWriteBuffer::init_dml_shadow_row(int64_t column_cnt, bool strip_lob_loc int ObDASWriteBuffer::try_add_row(const ObIArray &exprs, ObEvalCtx *ctx, const int64_t memory_limit, + DmlRow* &stored_row, bool &row_added, bool strip_lob_locator) { @@ -377,7 +378,6 @@ int ObDASWriteBuffer::try_add_row(const ObIArray &exprs, dml_shadow_row_->reuse(); } if (OB_SUCC(ret)) { - DmlRow *stored_row = nullptr; if (OB_FAIL(dml_shadow_row_->shadow_copy(exprs, *ctx))) { LOG_WARN("shadow copy dml row failed", K(ret)); } else if (OB_FAIL(try_add_row(*dml_shadow_row_, memory_limit, row_added, &stored_row))) { diff --git a/src/sql/das/ob_das_dml_ctx_define.h b/src/sql/das/ob_das_dml_ctx_define.h index 0474492010..506240ef90 100644 --- a/src/sql/das/ob_das_dml_ctx_define.h +++ b/src/sql/das/ob_das_dml_ctx_define.h @@ -379,6 +379,7 @@ public: int try_add_row(const common::ObIArray &exprs, ObEvalCtx *ctx, const int64_t memory_limit, + DmlRow* &stored_row, bool &row_added, bool strip_lob_locator); int try_add_row(const DmlShadowRow &sr, const int64_t memory_limit, bool &row_added, DmlRow **stored_row = nullptr); diff --git a/src/sql/das/ob_das_insert_op.cpp b/src/sql/das/ob_das_insert_op.cpp index 82ec5231bd..ace0df8891 100644 --- a/src/sql/das/ob_das_insert_op.cpp +++ b/src/sql/das/ob_das_insert_op.cpp @@ -356,12 +356,12 @@ int ObDASInsertOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASInsertOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full) +int ObDASInsertOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(insert_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, added, true))) { + if (OB_FAIL(insert_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to insert buffer failed", K(ret), K(row), K(insert_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_insert_op.h b/src/sql/das/ob_das_insert_op.h index 95f917edaa..c375d2f2ab 100644 --- a/src/sql/das/ob_das_insert_op.h +++ b/src/sql/das/ob_das_insert_op.h @@ -38,7 +38,7 @@ public: virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return ins_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return ins_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full); + int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); int64_t get_row_cnt() const { return insert_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASInsCtDef *ins_ctdef) { ins_ctdef_ = ins_ctdef; } void set_das_rtdef(ObDASInsRtDef *ins_rtdef) { ins_rtdef_ = ins_rtdef; } diff --git a/src/sql/das/ob_das_lock_op.cpp b/src/sql/das/ob_das_lock_op.cpp index 36ad4b8ba4..efbd796681 100644 --- a/src/sql/das/ob_das_lock_op.cpp +++ b/src/sql/das/ob_das_lock_op.cpp @@ -134,12 +134,12 @@ int ObDASLockOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASLockOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full) +int ObDASLockOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(lock_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, added, true))) { + if (OB_FAIL(lock_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to lock buffer failed", K(ret), K(row), K(lock_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_lock_op.h b/src/sql/das/ob_das_lock_op.h index 32922527cd..4812974974 100644 --- a/src/sql/das/ob_das_lock_op.h +++ b/src/sql/das/ob_das_lock_op.h @@ -35,7 +35,7 @@ public: virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return lock_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return lock_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full); + int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); int64_t get_row_cnt() const { return lock_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASLockCtDef *del_ctdef) { lock_ctdef_ = del_ctdef; } void set_das_rtdef(ObDASLockRtDef *del_rtdef) { lock_rtdef_ = del_rtdef; } diff --git a/src/sql/das/ob_das_scan_op.cpp b/src/sql/das/ob_das_scan_op.cpp index 72167f48a5..6d1bdae64e 100644 --- a/src/sql/das/ob_das_scan_op.cpp +++ b/src/sql/das/ob_das_scan_op.cpp @@ -1,4 +1,4 @@ -/** +/**ob_das_scan_op.cpp * Copyright (c) 2021 OceanBase * OceanBase CE is licensed under Mulan PubL v2. * You can use this software according to the terms and conditions of the Mulan PubL v2. @@ -67,7 +67,8 @@ OB_DEF_SERIALIZE(ObDASScanRtDef) sql_mode_, scan_flag_, pd_storage_flag_, - need_check_output_datum_); + need_check_output_datum_, + is_for_foreign_check_); return ret; } @@ -86,7 +87,8 @@ OB_DEF_DESERIALIZE(ObDASScanRtDef) sql_mode_, scan_flag_, pd_storage_flag_, - need_check_output_datum_); + need_check_output_datum_, + is_for_foreign_check_); if (OB_SUCC(ret)) { (void)ObSQLUtils::adjust_time_by_ntp_offset(timeout_ts_); } @@ -108,7 +110,8 @@ OB_DEF_SERIALIZE_SIZE(ObDASScanRtDef) sql_mode_, scan_flag_, pd_storage_flag_, - need_check_output_datum_); + need_check_output_datum_, + is_for_foreign_check_); return len; } @@ -202,6 +205,7 @@ int ObDASScanOp::init_scan_param() scan_param_.tx_lock_timeout_ = scan_rtdef_->tx_lock_timeout_; scan_param_.index_id_ = scan_ctdef_->ref_table_id_; scan_param_.is_get_ = scan_ctdef_->is_get_; + scan_param_.is_for_foreign_check_ = scan_rtdef_->is_for_foreign_check_; scan_param_.timeout_ = scan_rtdef_->timeout_ts_; scan_param_.scan_flag_ = scan_rtdef_->scan_flag_; scan_param_.reserved_cell_count_ = scan_ctdef_->access_column_ids_.count(); @@ -219,8 +223,10 @@ int ObDASScanOp::init_scan_param() scan_param_.tenant_schema_version_ = scan_rtdef_->tenant_schema_version_; scan_param_.limit_param_ = scan_rtdef_->limit_param_; scan_param_.need_scn_ = scan_rtdef_->need_scn_; - scan_param_.pd_storage_flag_ = scan_ctdef_->pd_expr_spec_.pd_storage_flag_; - scan_param_.fb_snapshot_ = scan_rtdef_->fb_snapshot_; + scan_param_.pd_storage_flag_ = scan_ctdef_->pd_expr_spec_.pd_storage_flag_; scan_param_.fb_snapshot_ = scan_rtdef_->fb_snapshot_; + if (scan_rtdef_->is_for_foreign_check_) { + scan_param_.trans_desc_ = trans_desc_; + } scan_param_.ls_id_ = ls_id_; scan_param_.tablet_id_ = tablet_id_; if (scan_rtdef_->sample_info_ != nullptr) { diff --git a/src/sql/das/ob_das_scan_op.h b/src/sql/das/ob_das_scan_op.h index d49e11410c..240171896e 100644 --- a/src/sql/das/ob_das_scan_op.h +++ b/src/sql/das/ob_das_scan_op.h @@ -101,7 +101,8 @@ public: pd_storage_flag_(false), stmt_allocator_("StmtScanAlloc"), scan_allocator_("TableScanAlloc"), - sample_info_(nullptr) + sample_info_(nullptr), + is_for_foreign_check_(false) { } virtual ~ObDASScanRtDef(); INHERIT_TO_STRING_KV("ObDASBaseRtDef", ObDASBaseRtDef, @@ -133,6 +134,7 @@ public: common::ObWrapperAllocatorWithAttr stmt_allocator_; common::ObWrapperAllocatorWithAttr scan_allocator_; const common::SampleInfo *sample_info_; //Block(Row)SampleScan, only support local das scan + bool is_for_foreign_check_; private: union { storage::ObRow2ExprsProjector row2exprs_projector_; diff --git a/src/sql/das/ob_das_update_op.cpp b/src/sql/das/ob_das_update_op.cpp index dfaac96988..274dc6817c 100644 --- a/src/sql/das/ob_das_update_op.cpp +++ b/src/sql/das/ob_das_update_op.cpp @@ -368,12 +368,12 @@ int ObDASUpdateOp::swizzling_remote_task(ObDASRemoteInfo *remote_info) return ret; } -int ObDASUpdateOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full) +int ObDASUpdateOp::write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full) { int ret = OB_SUCCESS; bool added = false; buffer_full = false; - if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, added, true))) { + if (OB_FAIL(write_buffer_.try_add_row(row, &eval_ctx, das::OB_DAS_MAX_PACKET_SIZE, stored_row, added, true))) { LOG_WARN("try add row to datum store failed", K(ret), K(row), K(write_buffer_)); } else if (!added) { buffer_full = true; diff --git a/src/sql/das/ob_das_update_op.h b/src/sql/das/ob_das_update_op.h index 7ef5d0e2ff..0d3993ff68 100644 --- a/src/sql/das/ob_das_update_op.h +++ b/src/sql/das/ob_das_update_op.h @@ -35,7 +35,7 @@ public: virtual int swizzling_remote_task(ObDASRemoteInfo *remote_info) override; virtual const ObDASBaseCtDef *get_ctdef() const override { return upd_ctdef_; } virtual ObDASBaseRtDef *get_rtdef() override { return upd_rtdef_; } - int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, bool &buffer_full); + int write_row(const ExprFixedArray &row, ObEvalCtx &eval_ctx, ObChunkDatumStore::StoredRow* &stored_row, bool &buffer_full); int64_t get_row_cnt() const { return write_buffer_.get_row_cnt(); } void set_das_ctdef(const ObDASUpdCtDef *upd_ctdef) { upd_ctdef_ = upd_ctdef; } void set_das_rtdef(ObDASUpdRtDef *upd_rtdef) { upd_rtdef_ = upd_rtdef; } diff --git a/src/sql/engine/dml/ob_dml_ctx_define.h b/src/sql/engine/dml/ob_dml_ctx_define.h index 8d11e113f0..4d48580125 100644 --- a/src/sql/engine/dml/ob_dml_ctx_define.h +++ b/src/sql/engine/dml/ob_dml_ctx_define.h @@ -384,6 +384,7 @@ public: K_(column_ids), K_(old_row), K_(new_row), + K_(full_row), K_(view_check_exprs), K_(is_primary_index), K_(is_heap_table), @@ -397,6 +398,7 @@ public: UIntFixedArray column_ids_; ExprFixedArray old_row_; ExprFixedArray new_row_; + ExprFixedArray full_row_; //reference the das base ctdef to facilitate //some modules to access the das ctdef //don't need to serialize @@ -420,6 +422,7 @@ protected: column_ids_(alloc), old_row_(alloc), new_row_(alloc), + full_row_(alloc), das_base_ctdef_(das_base_ctdef), error_logging_ctdef_(alloc), view_check_exprs_(alloc), @@ -541,7 +544,6 @@ struct ObUpdCtDef : ObDMLBaseCtDef public: ObUpdCtDef(common::ObIAllocator &alloc) : ObDMLBaseCtDef(alloc, dupd_ctdef_, DAS_OP_TABLE_UPDATE), - full_row_(alloc), dupd_ctdef_(alloc), need_check_filter_null_(false), distinct_algo_(T_DISTINCT_NONE), @@ -557,7 +559,6 @@ public: alloc_(alloc) { } INHERIT_TO_STRING_KV("ObDMLBaseCtDef", ObDMLBaseCtDef, - K_(full_row), K_(dupd_ctdef), K_(need_check_filter_null), K_(distinct_algo), @@ -571,7 +572,6 @@ public: K_(related_upd_ctdefs), K_(related_del_ctdefs), K_(related_ins_ctdefs)); - ExprFixedArray full_row_; ObDASUpdCtDef dupd_ctdef_; bool need_check_filter_null_; DistinctType distinct_algo_; @@ -967,6 +967,29 @@ public: private: common::ObIAllocator &alloc_; }; + +struct ObDMLModifyRowNode +{ +public: + ObDMLModifyRowNode(ObTableModifyOp *dml_op, const ObDMLBaseCtDef *dml_ctdef, ObDMLBaseRtDef *dml_rtdef, const ObDmlEventType dml_event) + : new_row_(nullptr), + old_row_(nullptr), + full_row_(nullptr), + dml_op_(dml_op), + dml_ctdef_(dml_ctdef), + dml_rtdef_(dml_rtdef), + dml_event_(dml_event) + {} + ObChunkDatumStore::StoredRow *new_row_; + ObChunkDatumStore::StoredRow *old_row_; + ObChunkDatumStore::StoredRow *full_row_; + ObTableModifyOp *dml_op_; + const ObDMLBaseCtDef *dml_ctdef_; + ObDMLBaseRtDef *dml_rtdef_; + ObDmlEventType dml_event_; +}; + +typedef common::ObList ObDMLModifyRowsList; } // namespace sql } // namespace oceanbase #endif /* DEV_SRC_SQL_ENGINE_DML_OB_DML_CTX_DEFINE_H_ */ diff --git a/src/sql/engine/dml/ob_dml_service.cpp b/src/sql/engine/dml/ob_dml_service.cpp index d73b836f13..15b2e6f43f 100644 --- a/src/sql/engine/dml/ob_dml_service.cpp +++ b/src/sql/engine/dml/ob_dml_service.cpp @@ -485,8 +485,6 @@ int ObDMLService::process_insert_row(const ObInsCtDef &ins_ctdef, ins_ctdef.das_ctdef_.is_ignore_, dml_op))) { LOG_WARN("check row null failed", K(ret)); - } else if (OB_FAIL(ForeignKeyHandle::do_handle(dml_op, ins_ctdef, ins_rtdef))) { - LOG_WARN("do handle new row with foreign key failed", K(ret)); } else if (OB_FAIL(filter_row_for_view_check(ins_ctdef.view_check_exprs_, eval_ctx, is_filtered))) { //check column constraint expr @@ -582,7 +580,7 @@ int ObDMLService::process_delete_row(const ObDelCtDef &del_ctdef, } } - if (OB_SUCC(ret) && !is_skipped && !OB_ISNULL(del_rtdef.se_rowkey_dist_ctx_) && !has_instead_of_trg) { + if (OB_SUCC(ret) && !is_skipped && OB_NOT_NULL(del_rtdef.se_rowkey_dist_ctx_) && !has_instead_of_trg) { bool is_distinct = false; ObExecContext *root_ctx = nullptr; if (OB_FAIL(dml_op.get_exec_ctx().get_root_ctx(root_ctx))) { @@ -606,17 +604,12 @@ int ObDMLService::process_delete_row(const ObDelCtDef &del_ctdef, } if (OB_SUCC(ret) && !is_skipped) { - if (!has_instead_of_trg && OB_FAIL(ForeignKeyHandle::do_handle(dml_op, del_ctdef, del_rtdef))) { - LOG_WARN("do handle old row for delete op failed", K(ret), K(del_ctdef), K(del_rtdef)); - } else if (OB_FAIL(TriggerHandle::init_param_old_row( + if (OB_FAIL(TriggerHandle::init_param_old_row( dml_op.get_eval_ctx(), del_ctdef.trig_ctdef_, del_rtdef.trig_rtdef_))) { LOG_WARN("failed to handle before trigger", K(ret)); } else if (OB_FAIL(TriggerHandle::do_handle_before_row( dml_op, del_ctdef.das_base_ctdef_, del_ctdef.trig_ctdef_, del_rtdef.trig_rtdef_))) { LOG_WARN("failed to handle before trigger", K(ret)); - } else if (OB_SUCC(ret) && OB_FAIL(TriggerHandle::do_handle_after_row( - dml_op, del_ctdef.trig_ctdef_, del_rtdef.trig_rtdef_, ObTriggerEvents::get_delete_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); } else if (has_instead_of_trg) { is_skipped = true; } @@ -710,8 +703,6 @@ int ObDMLService::process_update_row(const ObUpdCtDef &upd_ctdef, LOG_WARN("check row whether changed failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else if (OB_UNLIKELY(!upd_rtdef.is_row_changed_)) { //do nothing - } else if (OB_FAIL(ForeignKeyHandle::do_handle(dml_op, upd_ctdef, upd_rtdef))) { - LOG_WARN("do handle row for update op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else if (OB_FAIL(filter_row_for_view_check(upd_ctdef.view_check_exprs_, dml_op.get_eval_ctx(), is_filtered))) { LOG_WARN("filter row for view check exprs failed", K(ret)); } else if (OB_UNLIKELY(is_filtered)) { @@ -764,7 +755,8 @@ int ObDMLService::process_update_row(const ObUpdCtDef &upd_ctdef, int ObDMLService::insert_row(const ObInsCtDef &ins_ctdef, ObInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc, - ObDMLRtCtx &dml_rtctx) + ObDMLRtCtx &dml_rtctx, + ObChunkDatumStore::StoredRow* &stored_row) { int ret = OB_SUCCESS; if (OB_FAIL(check_dml_tablet_validity(dml_rtctx, @@ -777,7 +769,8 @@ int ObDMLService::insert_row(const ObInsCtDef &ins_ctdef, ins_rtdef.das_rtdef_, tablet_loc, dml_rtctx, - ins_ctdef.new_row_))) { + ins_ctdef.new_row_, + stored_row))) { LOG_WARN("insert row to das failed", K(ret)); } return ret; @@ -787,15 +780,19 @@ int ObDMLService::insert_row(const ObDASInsCtDef &ins_ctdef, ObDASInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, - const ExprFixedArray &new_row) + const ExprFixedArray &new_row, + ObChunkDatumStore::StoredRow* &stored_row) { - return write_row_to_das_op(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx, new_row); + int ret = OB_SUCCESS; + ret = write_row_to_das_op(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx, new_row, stored_row); + return ret; } int ObDMLService::delete_row(const ObDelCtDef &del_ctdef, ObDelRtDef &del_rtdef, const ObDASTabletLoc *tablet_loc, - ObDMLRtCtx &dml_rtctx) + ObDMLRtCtx &dml_rtctx, + ObChunkDatumStore::StoredRow* &stored_row) { int ret = OB_SUCCESS; if (OB_FAIL(check_dml_tablet_validity(dml_rtctx, @@ -808,7 +805,8 @@ int ObDMLService::delete_row(const ObDelCtDef &del_ctdef, del_rtdef.das_rtdef_, tablet_loc, dml_rtctx, - del_ctdef.old_row_))) { + del_ctdef.old_row_, + stored_row))) { LOG_WARN("delete old row from das failed", K(ret)); } return ret; @@ -819,6 +817,7 @@ int ObDMLService::lock_row(const ObLockCtDef &lock_ctdef, ObDMLRtCtx &dml_rtctx) { int ret = OB_SUCCESS; + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(check_dml_tablet_validity(dml_rtctx, *tablet_loc, lock_ctdef.old_row_, @@ -829,7 +828,8 @@ int ObDMLService::lock_row(const ObLockCtDef &lock_ctdef, lock_rtdef.das_rtdef_, tablet_loc, dml_rtctx, - lock_ctdef.old_row_))) { + lock_ctdef.old_row_, + stored_row))) { LOG_WARN("lock row to das failed", K(ret)); } return ret; @@ -841,18 +841,23 @@ int ObDMLService::lock_row(const ObDASLockCtDef &dlock_ctdef, ObDMLRtCtx &das_rtctx, const ExprFixedArray &old_row) { + ObChunkDatumStore::StoredRow* stored_row = nullptr; return write_row_to_das_op(dlock_ctdef, dlock_rtdef, tablet_loc, das_rtctx, - old_row); + old_row, + stored_row); } int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, ObUpdRtDef &upd_rtdef, const ObDASTabletLoc *old_tablet_loc, const ObDASTabletLoc *new_tablet_loc, - ObDMLRtCtx &dml_rtctx) + ObDMLRtCtx &dml_rtctx, + ObChunkDatumStore::StoredRow* &old_row, + ObChunkDatumStore::StoredRow* &new_row, + ObChunkDatumStore::StoredRow* &full_row) { int ret = OB_SUCCESS; ObPhysicalPlanCtx *plan_ctx = GET_PHY_PLAN_CTX(dml_rtctx.get_exec_ctx()); @@ -870,6 +875,7 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, LOG_WARN("check update new row tablet validity failed", K(ret)); } else if (OB_UNLIKELY(!upd_rtdef.is_row_changed_)) { //old row is equal to new row, only need to lock row + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_ISNULL(upd_rtdef.dlock_rtdef_)) { ObIAllocator &allocator = dml_rtctx.get_exec_ctx().get_allocator(); if (OB_FAIL(init_das_lock_rtdef_for_update(dml_rtctx, upd_ctdef, upd_rtdef))) { @@ -884,7 +890,8 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, *upd_rtdef.dlock_rtdef_, old_tablet_loc, dml_rtctx, - upd_ctdef.old_row_))) { + upd_ctdef.old_row_, + stored_row))) { LOG_WARN("write row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } } @@ -910,7 +917,8 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, *upd_rtdef.ddel_rtdef_, old_tablet_loc, dml_rtctx, - upd_ctdef.old_row_))) { + upd_ctdef.old_row_, + old_row))) { LOG_WARN("delete row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else if (upd_ctdef.is_heap_table_ && OB_FAIL(set_update_hidden_pk(dml_rtctx.get_eval_ctx(), @@ -921,7 +929,8 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, *upd_rtdef.dins_rtdef_, new_tablet_loc, dml_rtctx, - upd_ctdef.new_row_))) { + upd_ctdef.new_row_, + new_row))) { LOG_WARN("insert row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else { LOG_DEBUG("update pkey changed", K(ret), KPC(old_tablet_loc), KPC(new_tablet_loc), @@ -933,7 +942,8 @@ int ObDMLService::update_row(const ObUpdCtDef &upd_ctdef, upd_rtdef.dupd_rtdef_, old_tablet_loc, dml_rtctx, - upd_ctdef.full_row_))) { + upd_ctdef.full_row_, + full_row))) { LOG_WARN("write row to das op failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } else { LOG_DEBUG("update pkey not changed", K(ret), KPC(old_tablet_loc), @@ -949,24 +959,28 @@ int ObDMLService::update_row(const ObDASUpdCtDef &ctdef, ObDMLRtCtx &dml_rtctx, const ExprFixedArray &full_row) { + ObChunkDatumStore::StoredRow* stored_row = nullptr; return write_row_to_das_op(ctdef, rtdef, tablet_loc, dml_rtctx, - full_row); + full_row, + stored_row); } int ObDMLService::delete_row(const ObDASDelCtDef &das_del_ctdef, ObDASDelRtDef &das_del_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &das_rtctx, - const ExprFixedArray &old_row) + const ExprFixedArray &old_row, + ObChunkDatumStore::StoredRow* &stored_row) { return write_row_to_das_op(das_del_ctdef, das_del_rtdef, tablet_loc, das_rtctx, - old_row); + old_row, + stored_row); } int ObDMLService::init_dml_param(const ObDASDMLBaseCtDef &base_ctdef, @@ -1305,7 +1319,6 @@ int ObDMLService::init_das_lock_rtdef_for_update(ObDMLRtCtx &dml_rtctx, } return ret; } - int ObDMLService::init_lock_rtdef(ObDMLRtCtx &dml_rtctx, const ObLockCtDef &lock_ctdef, ObLockRtDef &lock_rtdef, @@ -1321,7 +1334,8 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, ObDASDMLBaseRtDef &rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, - const ExprFixedArray &row) + const ExprFixedArray &row, + ObChunkDatumStore::StoredRow* &stored_row) { int ret = OB_SUCCESS; bool need_retry = false; @@ -1355,35 +1369,20 @@ int ObDMLService::write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, } //2. try add row to das dml buffer if (OB_SUCC(ret)) { - int64_t simulate_row_cnt = - EVENT_CALL(EventTable::EN_DAS_DML_BUFFER_OVERFLOW); - if (OB_UNLIKELY(simulate_row_cnt > 0 && dml_op->get_row_cnt() >= simulate_row_cnt)) { - buffer_full = true; - } else if (OB_FAIL(dml_op->write_row(row, dml_rtctx.get_eval_ctx(), buffer_full))) { + if (OB_FAIL(dml_op->write_row(row, dml_rtctx.get_eval_ctx(), stored_row, buffer_full))) { LOG_WARN("insert row to das dml op buffer failed", K(ret), K(ctdef), K(rtdef)); } LOG_DEBUG("write row to das op", K(ret), K(buffer_full), "op_type", N, "table_id", ctdef.table_id_, "index_tid", ctdef.index_tid_, "row", ROWEXPR2STR(dml_rtctx.get_eval_ctx(), row)); } - //3. if buffer is full, flush das task, and retry to add row + //3. if buffer is full, frozen node, create a new das op to add row if (OB_SUCC(ret) && buffer_full) { need_retry = true; if (REACH_COUNT_INTERVAL(10)) { // print log per 10 times. LOG_INFO("DAS write buffer full, ", K(dml_op->get_row_cnt()), K(dml_rtctx.das_ref_.get_das_mem_used())); } - if (OB_UNLIKELY(dml_rtctx.need_non_sub_full_task())) { - // 因为replace into 和 insert up在做try_insert时,需要返回duplicated row, - // 所以写满的das task现在不能提交,先frozen das task list, - // 等所有insert_row全部写完之后再统一提交 - dml_rtctx.das_ref_.set_frozen_node(); - } else { - if (dml_rtctx.need_pick_del_task_first() && - OB_FAIL(dml_rtctx.das_ref_.pick_del_task_to_first())) { - LOG_WARN("fail to pick delete das task to first", K(ret)); - } else if (OB_FAIL(dml_rtctx.op_.submit_all_dml_task())) { - LOG_WARN("submit all dml task failed", K(ret)); - } - } + dml_rtctx.das_ref_.set_frozen_node(); } } while (OB_SUCC(ret) && need_retry); return ret; @@ -1805,5 +1804,91 @@ int ObDMLService::get_nested_dup_table_ctx(const uint64_t table_id, DASDelCtxLi } return ret; } + +int ObDMLService::handle_after_row_processing_batch(ObDMLModifyRowsList *dml_modify_rows) +{ + int ret = OB_SUCCESS; + ObDMLModifyRowsList::iterator row_iter = dml_modify_rows->begin(); + for (; OB_SUCC(ret) && row_iter != dml_modify_rows->end(); row_iter++) { + ObDMLModifyRowNode &modify_row = *row_iter; + if (OB_ISNULL(modify_row.dml_op_) || OB_ISNULL(modify_row.dml_ctdef_) || OB_ISNULL(modify_row.dml_rtdef_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid parameter for batch post row processing", K(ret)); + } else { + ObTableModifyOp &op = *modify_row.dml_op_; + const ObDMLBaseCtDef &dml_ctdef = *modify_row.dml_ctdef_; + ObDMLBaseRtDef &dml_rtdef = *modify_row.dml_rtdef_; + // process foreign key + if (OB_NOT_NULL(modify_row.full_row_) && OB_FAIL(modify_row.full_row_->to_expr(modify_row.dml_ctdef_->full_row_, op.get_eval_ctx()))) { + LOG_WARN("failed to covert stored full row to expr", K(ret)); + } else if (OB_NOT_NULL(modify_row.old_row_) && OB_FAIL(modify_row.old_row_->to_expr(dml_ctdef.old_row_, op.get_eval_ctx()))) { + LOG_WARN("failed to covert stored old row to expr", K(ret)); + } else if (OB_NOT_NULL(modify_row.new_row_) && OB_FAIL(modify_row.new_row_->to_expr(dml_ctdef.new_row_, op.get_eval_ctx()))) { + LOG_WARN("failed to covert stored new row to expr", K(ret)); + } else if (OB_FAIL(ForeignKeyHandle::do_handle(op, dml_ctdef, dml_rtdef))) { + LOG_WARN("failed to handle foreign key constraints", K(ret)); + } + // process after row trigger + const ObDmlEventType t_insert = ObDmlEventType::DE_INSERTING; + const ObDmlEventType t_update = ObDmlEventType::DE_UPDATING; + const ObDmlEventType t_delete = ObDmlEventType::DE_DELETING; + const ObDmlEventType dml_event = modify_row.dml_event_; + if (OB_SUCC(ret)) { + ObEvalCtx &eval_ctx = op.get_eval_ctx(); + if (dml_event != t_insert && dml_event != t_update && dml_event != t_delete) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid trigger event", K(ret)); + } else if (t_insert == dml_event && OB_FAIL(TriggerHandle::init_param_new_row( + eval_ctx, dml_ctdef.trig_ctdef_, dml_rtdef.trig_rtdef_))) { + LOG_WARN("failed to init trigger parameter for new row", K(ret)); + } else if (t_delete == dml_event && OB_FAIL(TriggerHandle::init_param_old_row( + eval_ctx, dml_ctdef.trig_ctdef_, dml_rtdef.trig_rtdef_))) { + LOG_WARN("failed to init trigger parameter for old row", K(ret)); + } else if (t_update == dml_event && OB_FAIL(TriggerHandle::init_param_rows( + eval_ctx, dml_ctdef.trig_ctdef_, dml_rtdef.trig_rtdef_))) { + LOG_WARN("failed to init trigger parameter for old row and new row", K(ret)); + } else if (OB_FAIL(TriggerHandle::do_handle_after_row(op, dml_ctdef.trig_ctdef_, dml_rtdef.trig_rtdef_, dml_event))) { + LOG_WARN("failed to handle after trigger", K(ret)); + } + } + } + } + return ret; +} + +int ObDMLService::handle_after_row_processing(ObDMLModifyRowsList *dml_modify_rows) +{ + int ret = OB_SUCCESS; + if (1 < dml_modify_rows->size()) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("the number of rows in the list is more than 1", K(ret), K(dml_modify_rows->size())); + } else if (1 == dml_modify_rows->size()) { + // for single-row processing, the expr defined in ctdef and trig parameters haven't been refreshed + // Therefore, there is no need to re-convert the rows to be modified into an expression and init trig parameters + ObDMLModifyRowsList::iterator row_iter = dml_modify_rows->begin(); + ObDMLModifyRowNode &modify_row = *row_iter; + if (OB_ISNULL(modify_row.dml_op_) || OB_ISNULL(modify_row.dml_ctdef_) || OB_ISNULL(modify_row.dml_rtdef_)) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid parameter for batch post row processing", K(ret)); + } else { + ObTableModifyOp &op = *modify_row.dml_op_; + const ObDMLBaseCtDef &dml_ctdef = *modify_row.dml_ctdef_; + ObDMLBaseRtDef &dml_rtdef = *modify_row.dml_rtdef_; + const ObDmlEventType t_insert = ObDmlEventType::DE_INSERTING; + const ObDmlEventType t_update = ObDmlEventType::DE_UPDATING; + const ObDmlEventType t_delete = ObDmlEventType::DE_DELETING; + const ObDmlEventType dml_event = modify_row.dml_event_; + if (dml_event != t_insert && dml_event != t_update && dml_event != t_delete) { + ret = OB_ERR_UNEXPECTED; + LOG_WARN("invalid trigger event", K(ret)); + } else if (OB_FAIL(ForeignKeyHandle::do_handle(op, dml_ctdef, dml_rtdef))) { + LOG_WARN("failed to handle foreign key constraints", K(ret)); + } else if (OB_FAIL(TriggerHandle::do_handle_after_row(op, dml_ctdef.trig_ctdef_, dml_rtdef.trig_rtdef_, dml_event))) { + LOG_WARN("failed to handle after trigger", K(ret)); + } + } + } + return ret; +} } // namespace sql } // namespace oceanbase diff --git a/src/sql/engine/dml/ob_dml_service.h b/src/sql/engine/dml/ob_dml_service.h index 0f86eaf37b..24c1a140e0 100644 --- a/src/sql/engine/dml/ob_dml_service.h +++ b/src/sql/engine/dml/ob_dml_service.h @@ -14,6 +14,7 @@ #define DEV_SRC_SQL_ENGINE_DML_OB_DML_SERVICE_H_ #include "sql/engine/dml/ob_dml_ctx_define.h" #include "sql/das/ob_das_context.h" +#include "ob_table_modify_op.h" namespace oceanbase { namespace sql @@ -86,16 +87,19 @@ public: static int insert_row(const ObInsCtDef &ins_ctdef, ObInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc, - ObDMLRtCtx &dml_rtctx); + ObDMLRtCtx &dml_rtctx, + ObChunkDatumStore::StoredRow* &stored_row); static int insert_row(const ObDASInsCtDef &ins_ctdef, ObDASInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &das_rtctx, - const ExprFixedArray &new_row); + const ExprFixedArray &new_row, + ObChunkDatumStore::StoredRow* &stored_row); static int delete_row(const ObDelCtDef &del_ctdef, ObDelRtDef &del_rtdef, const ObDASTabletLoc *tablet_loc, - ObDMLRtCtx &dml_rtctx); + ObDMLRtCtx &dml_rtctx, + ObChunkDatumStore::StoredRow* &stored_row); static int update_row(const ObDASUpdCtDef &ctdef, ObDASUpdRtDef &rtdef, const ObDASTabletLoc *tablet_loc, @@ -105,13 +109,17 @@ public: ObUpdRtDef &upd_rtdef, const ObDASTabletLoc *old_tablet_loc, const ObDASTabletLoc *new_tablet_loc, - ObDMLRtCtx &dml_rtctx); + ObDMLRtCtx &dml_rtctx, + ObChunkDatumStore::StoredRow* &old_row, + ObChunkDatumStore::StoredRow* &new_row, + ObChunkDatumStore::StoredRow* &full_row); static int delete_row(const ObDASDelCtDef &ctdef, ObDASDelRtDef &rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &das_rtctx, - const ExprFixedArray &old_row); + const ExprFixedArray &old_row, + ObChunkDatumStore::StoredRow* &stored_row); static int lock_row(const ObDASLockCtDef &dlock_ctdef, ObDASLockRtDef &dlock_rtdef, @@ -215,14 +223,16 @@ public: static int get_nested_dup_table_ctx(const uint64_t table_id, DASDelCtxList& del_ctx_list, SeRowkeyDistCtx* &rowkey_dist_ctx); - + static int handle_after_row_processing(ObDMLModifyRowsList *dml_modify_rows); + static int handle_after_row_processing_batch(ObDMLModifyRowsList *dml_modify_rows); private: template static int write_row_to_das_op(const ObDASDMLBaseCtDef &ctdef, ObDASDMLBaseRtDef &rtdef, const ObDASTabletLoc *tablet_loc, ObDMLRtCtx &dml_rtctx, - const ExprFixedArray &row); + const ExprFixedArray &row, + ObChunkDatumStore::StoredRow* &stored_row); template static const ObDASTableLocMeta *get_table_loc_meta(const T *multi_ctdef); static int check_nested_sql_legality(ObExecContext &ctx, common::ObTableID ref_table_id); diff --git a/src/sql/engine/dml/ob_table_delete_op.cpp b/src/sql/engine/dml/ob_table_delete_op.cpp index 4e4689f4f4..50a8a7c5f3 100644 --- a/src/sql/engine/dml/ob_table_delete_op.cpp +++ b/src/sql/engine/dml/ob_table_delete_op.cpp @@ -134,6 +134,22 @@ OB_INLINE int ObTableDeleteOp::inner_open_with_das() return ret; } +int ObTableDeleteOp::check_need_exec_single_row() +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.del_ctdefs_.count() && !execute_single_row_; ++i) { + const ObTableDeleteSpec::DelCtDefArray &ctdefs = MY_SPEC.del_ctdefs_.at(i); + const ObDelCtDef &del_ctdef = *ctdefs.at(0); + const ObForeignKeyArgArray &fk_args = del_ctdef.fk_args_; + for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) { + if (fk_args.at(j).is_self_ref_ && fk_args.at(j).ref_action_ == ACTION_CASCADE) { + execute_single_row_ = true; + } + } + } + return ret; +} + OB_INLINE int ObTableDeleteOp::open_table_for_each() { int ret = OB_SUCCESS; @@ -238,6 +254,7 @@ OB_INLINE int ObTableDeleteOp::delete_row_to_das() const ObDelCtDef &del_ctdef = *ctdefs.at(j); ObDelRtDef &del_rtdef = rtdefs.at(j); ObDASTabletLoc *tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, &del_ctdef, &del_rtdef, ObDmlEventType::DE_DELETING); bool is_skipped = false; if (OB_FAIL(ObDMLService::process_delete_row(del_ctdef, del_rtdef, is_skipped, *this))) { LOG_WARN("process delete row failed", K(ret)); @@ -247,8 +264,10 @@ OB_INLINE int ObTableDeleteOp::delete_row_to_das() break; } else if (OB_FAIL(calc_tablet_loc(del_ctdef, del_rtdef, tablet_loc))) { LOG_WARN("calc partition key failed", K(ret)); - } else if (OB_FAIL(ObDMLService::delete_row(del_ctdef, del_rtdef, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::delete_row(del_ctdef, del_rtdef, tablet_loc, dml_rtctx_, modify_row.old_row_))) { LOG_WARN("insert row with das failed", K(ret)); + } else if (need_after_row_process(del_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } else if (!MY_SPEC.del_ctdefs_.at(0).at(0)->has_instead_of_trigger_) { ++del_rtdef.cur_row_num_; } diff --git a/src/sql/engine/dml/ob_table_delete_op.h b/src/sql/engine/dml/ob_table_delete_op.h index 390f05a3d7..1ef85c91ca 100644 --- a/src/sql/engine/dml/ob_table_delete_op.h +++ b/src/sql/engine/dml/ob_table_delete_op.h @@ -93,6 +93,7 @@ protected: int close_table_for_each(); int check_delete_affected_row(); virtual int write_row_to_das_buffer() override; + virtual int check_need_exec_single_row() override; protected: DelRtDef2DArray del_rtdefs_; //see the comment of DelCtDef2DArray ObErrLogService err_log_service_; diff --git a/src/sql/engine/dml/ob_table_insert_all_op.cpp b/src/sql/engine/dml/ob_table_insert_all_op.cpp index a69af536b3..16a0b3fa5b 100644 --- a/src/sql/engine/dml/ob_table_insert_all_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_all_op.cpp @@ -100,6 +100,29 @@ int ObTableInsertAllOp::switch_iterator(ObExecContext &ctx) return common::OB_ITER_END; } +// If there are foreign key dependencies between the tables to be inserted, a single row is required +int ObTableInsertAllOp::check_need_exec_single_row() +{ + int ret = OB_SUCCESS; + for (int64_t i = 0; OB_SUCC(ret) && i < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++i) { + const ObTableInsertSpec::InsCtDefArray &ctdefs = MY_SPEC.ins_ctdefs_.at(i); + const ObInsCtDef &ins_ctdef = *(ctdefs.at(0)); + const uint64_t table_id = ins_ctdef.das_base_ctdef_.index_tid_; + const ObForeignKeyArgArray &fk_args = ins_ctdef.fk_args_; + for (int j = 0; OB_SUCC(ret) && j < fk_args.count() && !execute_single_row_; j++) { + const ObForeignKeyArg &fk_arg = fk_args.at(j); + const uint64_t parent_table_id = fk_arg.table_id_; + for (int k = 0; k < MY_SPEC.ins_ctdefs_.count() && !execute_single_row_; ++k) { + const uint64_t tmp_table_id = MY_SPEC.ins_ctdefs_.at(k).at(0)->das_base_ctdef_.index_tid_; + if (parent_table_id == tmp_table_id && k != i) { + execute_single_row_ = true; + } + } + } + } + return ret; +} + int ObTableInsertAllOp::write_row_to_das_buffer() { int ret = OB_SUCCESS; @@ -130,6 +153,7 @@ int ObTableInsertAllOp::write_row_to_das_buffer() const ObInsCtDef &ins_ctdef = *(ctdefs.at(j)); ObInsRtDef &ins_rtdef = rtdefs.at(j); ObDASTabletLoc *tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, &ins_ctdef, &ins_rtdef, ObDmlEventType::DE_INSERTING); ++ins_rtdef.cur_row_num_; if (OB_FAIL(ObDMLService::init_heap_table_pk_for_ins(ins_ctdef, eval_ctx_))) { LOG_WARN("fail to init heap table pk to null", K(ret)); @@ -145,18 +169,20 @@ int ObTableInsertAllOp::write_row_to_das_buffer() LOG_WARN("calc partition key failed", K(ret)); } else if (OB_FAIL(ObDMLService::set_heap_table_hidden_pk(ins_ctdef, tablet_loc->tablet_id_, eval_ctx_))) { LOG_WARN("set_heap_table_hidden_pk failed", K(ret), KPC(tablet_loc)); - } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_, modify_row.new_row_))) { LOG_WARN("insert row with das failed", K(ret)); // TODO(yikang): fix trigger related for heap table - } else if (ins_ctdef.is_primary_index_ && OB_FAIL(TriggerHandle::do_handle_after_row(*this, - ins_ctdef.trig_ctdef_, - ins_rtdef.trig_rtdef_, - ObTriggerEvents::get_insert_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); + } else if (need_after_row_process(ins_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } else { have_insert_row = true; } } // end for global index ctdef loop + + // NOTE: for insert all into t1,t2, t1 is the parent table of t2, Single-line execution is required to ensure oracle compatibility + if (OB_SUCC(ret) && execute_single_row_ && OB_FAIL(submit_all_dml_task())) { + LOG_WARN("failed to push dml task", K(ret)); + } } } // end for table ctdef loop //erro logging not support, fix it later diff --git a/src/sql/engine/dml/ob_table_insert_all_op.h b/src/sql/engine/dml/ob_table_insert_all_op.h index ff3dd5826f..b7da1c4fb9 100644 --- a/src/sql/engine/dml/ob_table_insert_all_op.h +++ b/src/sql/engine/dml/ob_table_insert_all_op.h @@ -78,6 +78,7 @@ protected: virtual int inner_close() override; protected: virtual int write_row_to_das_buffer() override; + virtual int check_need_exec_single_row() override; private: int check_match_conditions(const int64_t tbl_idx, const bool have_insert_row, diff --git a/src/sql/engine/dml/ob_table_insert_op.cpp b/src/sql/engine/dml/ob_table_insert_op.cpp index 3d418e84ae..2b93e58992 100644 --- a/src/sql/engine/dml/ob_table_insert_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_op.cpp @@ -227,6 +227,7 @@ OB_INLINE int ObTableInsertOp::insert_row_to_das() const ObInsCtDef &ins_ctdef = *(ctdefs.at(j)); ObInsRtDef &ins_rtdef = rtdefs.at(j); ObDASTabletLoc *tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, &ins_ctdef, &ins_rtdef, ObDmlEventType::DE_INSERTING); if (!MY_SPEC.ins_ctdefs_.at(0).at(0)->has_instead_of_trigger_) { ++ins_rtdef.cur_row_num_; } @@ -246,21 +247,16 @@ OB_INLINE int ObTableInsertOp::insert_row_to_das() tablet_loc->tablet_id_, eval_ctx_))) { LOG_WARN("set_heap_table_hidden_pk failed", K(ret), KPC(tablet_loc)); - } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_, modify_row.new_row_))) { LOG_WARN("insert row with das failed", K(ret)); // TODO(yikang): fix trigger related for heap table - } else if (ins_ctdef.is_primary_index_ && - OB_FAIL(TriggerHandle::do_handle_after_row(*this, - ins_ctdef.trig_ctdef_, - ins_rtdef.trig_rtdef_, - ObTriggerEvents::get_insert_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); + } else if (need_after_row_process(ins_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } if (OB_FAIL(ret)) { record_err_for_load_data(ret, ins_rtdef.cur_row_num_); } } // end for global index ctdef loop - if (OB_SUCC(ret)) { int64_t insert_rows = is_skipped ? 0 : 1; if (OB_FAIL(merge_implict_cursor(insert_rows, 0, 0, 0))) { diff --git a/src/sql/engine/dml/ob_table_insert_up_op.cpp b/src/sql/engine/dml/ob_table_insert_up_op.cpp index 6bf0d3fca4..9e4e48afb3 100644 --- a/src/sql/engine/dml/ob_table_insert_up_op.cpp +++ b/src/sql/engine/dml/ob_table_insert_up_op.cpp @@ -315,6 +315,7 @@ int ObTableInsertUpOp::do_insert_up_cache() } else if (constraint_values.empty()) { // do insert ObChunkDatumStore::StoredRow *insert_new_row = NULL; + ObDMLModifyRowNode modify_row(this, &ins_ctdef, &ins_rtdef, ObDmlEventType::DE_INSERTING); if (is_ignore_) { ret = OB_ERR_UNEXPECTED; LOG_WARN("ignore is unexpected", K(ret), KPC(insert_row)); @@ -322,11 +323,6 @@ int ObTableInsertUpOp::do_insert_up_cache() LOG_WARN("fail to do process insert", K(ret), K(ins_ctdef), "insert_row", ROWEXPR2STR(eval_ctx_, get_primary_table_insert_row())); // TODO(yikang): fix trigger related for heap table - } else if (ins_ctdef.is_primary_index_ && OB_FAIL(TriggerHandle::do_handle_after_row(*this, - ins_ctdef.trig_ctdef_, - ins_rtdef.trig_rtdef_, - ObTriggerEvents::get_insert_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); } else if (OB_FAIL(conflict_checker_.convert_exprs_to_stored_row(get_primary_table_insert_row(), insert_new_row))) { LOG_WARN("convert expr to stored row failed", K(ret), @@ -335,7 +331,12 @@ int ObTableInsertUpOp::do_insert_up_cache() LOG_WARN("fail to insert row", K(ret), "insert_row", ROWEXPR2STR(eval_ctx_, get_primary_table_insert_row())); } else { - insert_rows_++; + modify_row.new_row_ = insert_new_row; + if (need_after_row_process(ins_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); + } else { + insert_rows_++; + } } } else { // do update @@ -344,6 +345,7 @@ int ObTableInsertUpOp::do_insert_up_cache() // clear_evaluated_flag, 并且将数据重新flush到insert_row中 ObChunkDatumStore::StoredRow *upd_new_row = NULL; const ObChunkDatumStore::StoredRow *upd_old_row = constraint_values.at(0).current_datum_row_; + ObDMLModifyRowNode modify_row(this, &upd_ctdef, &upd_rtdef, ObDmlEventType::DE_UPDATING); clear_evaluated_flag(); if (OB_FAIL(insert_row->to_expr(MY_SPEC.all_saved_exprs_, eval_ctx_))) { LOG_WARN("insert_row to expr failed", K(ret), KPC(insert_row), @@ -368,13 +370,6 @@ int ObTableInsertUpOp::do_insert_up_cache() } else if (upd_ctdef.is_heap_table_ && OB_FAIL(set_heap_table_new_pk(upd_ctdef, upd_rtdef))) { LOG_WARN("set heap table hidden_pk failed", K(ret), K(upd_ctdef)); - } else if (!is_skipped && upd_rtdef.is_row_changed_ - && OB_FAIL(TriggerHandle::do_handle_after_row(*this, - upd_ctdef.trig_ctdef_, - upd_rtdef.trig_rtdef_, - ObTriggerEvents::get_update_event()))) { - //兼容mysql如果行没变化,不执行行后trigger - LOG_WARN("failed to handle after trigger", K(ret)); } else if (OB_FAIL(conflict_checker_.convert_exprs_to_stored_row(get_primary_table_upd_new_row(), upd_new_row))) { LOG_WARN("convert expr to stored row failed", K(ret), "exprs", get_primary_table_upd_old_row()); @@ -394,8 +389,14 @@ int ObTableInsertUpOp::do_insert_up_cache() LOG_WARN("fail to update row in conflict_checker", K(ret), KPC(upd_new_row), KPC(upd_old_row)); } else { - insert_rows_++; - upd_changed_rows_++; + modify_row.old_row_ = const_cast(upd_old_row); + modify_row.new_row_ = upd_new_row; + if (need_after_row_process(upd_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); + } else { + insert_rows_++; + upd_changed_rows_++; + } } } else { // create table t1(c1 int primary key, c2 timestamp default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP); @@ -419,7 +420,24 @@ int ObTableInsertUpOp::insert_row_to_das(const ObInsCtDef &ins_ctdef, const ObDASTabletLoc *tablet_loc) { int ret = OB_SUCCESS; - if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_))) { + ObChunkDatumStore::StoredRow* stored_row = nullptr; + if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_, stored_row))) { + LOG_WARN("insert row with das failed", K(ret)); + } else { + LOG_TRACE("insert one row", KPC(tablet_loc), + "insert row", ROWEXPR2STR(eval_ctx_, ins_ctdef.new_row_)); + } + + return ret; +} + +int ObTableInsertUpOp::insert_row_to_das(const ObInsCtDef &ins_ctdef, + ObInsRtDef &ins_rtdef, + const ObDASTabletLoc *tablet_loc, + ObDMLModifyRowNode &modify_row) +{ + int ret = OB_SUCCESS; + if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_, modify_row.new_row_))) { LOG_WARN("insert row with das failed", K(ret)); } else { LOG_TRACE("insert one row", KPC(tablet_loc), @@ -441,6 +459,7 @@ int ObTableInsertUpOp::try_insert_row() ObInsertUpRtDef &insert_up_rtdef = insert_up_rtdefs_.at(i); ObInsRtDef &ins_rtdef = insert_up_rtdef.ins_rtdef_; ObDASTabletLoc *tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, const_cast(&ins_ctdef), &ins_rtdef, ObDmlEventType::DE_INSERTING); ++ins_rtdef.cur_row_num_; if (OB_FAIL(ObDMLService::process_insert_row(ins_ctdef, ins_rtdef, *this, is_skipped))) { LOG_WARN("process insert row failed", K(ret)); @@ -451,14 +470,11 @@ int ObTableInsertUpOp::try_insert_row() } else if (ins_ctdef.is_heap_table_ && OB_FAIL(ObDMLService::set_heap_table_hidden_pk(ins_ctdef, tablet_loc->tablet_id_, eval_ctx_))) { LOG_WARN("set_heap_table_hidden_pk failed", K(ret), KPC(tablet_loc)); - } else if (OB_FAIL(insert_row_to_das(ins_ctdef, ins_rtdef, tablet_loc))) { + } else if (OB_FAIL(insert_row_to_das(ins_ctdef, ins_rtdef, tablet_loc, modify_row))) { LOG_WARN("insert row with das failed", K(ret)); // TODO(yikang): fix trigger related for heap table - } else if (ins_ctdef.is_primary_index_ && OB_FAIL(TriggerHandle::do_handle_after_row(*this, - ins_ctdef.trig_ctdef_, - ins_rtdef.trig_rtdef_, - ObTriggerEvents::get_insert_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); + } else if (need_after_row_process(ins_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } } return ret; @@ -580,7 +596,7 @@ int ObTableInsertUpOp::delete_one_upd_old_row_das(const ObUpdCtDef &upd_ctdef, LOG_WARN("init das dml rtdef failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } } - + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(ret)) { //do nothing } else if (OB_ISNULL(upd_rtdef.ddel_rtdef_)) { @@ -596,7 +612,8 @@ int ObTableInsertUpOp::delete_one_upd_old_row_das(const ObUpdCtDef &upd_ctdef, *upd_rtdef.ddel_rtdef_, tablet_loc, upd_rtctx_, - upd_ctdef.old_row_))) { + upd_ctdef.old_row_, + stored_row))) { LOG_WARN("delete row with das failed", K(ret)); } else { LOG_DEBUG("delete upd old_row", KPC(tablet_loc), "upd old_row", @@ -621,7 +638,7 @@ int ObTableInsertUpOp::insert_one_upd_new_row_das(const ObUpdCtDef &upd_ctdef, LOG_WARN("init das dml rtdef failed", K(ret), K(upd_ctdef), K(upd_rtdef)); } } - + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(ret)) { //do nothing } else if (OB_ISNULL(upd_rtdef.dins_rtdef_)) { @@ -637,7 +654,8 @@ int ObTableInsertUpOp::insert_one_upd_new_row_das(const ObUpdCtDef &upd_ctdef, *upd_rtdef.dins_rtdef_, tablet_loc, upd_rtctx_, - upd_ctdef.new_row_))) { + upd_ctdef.new_row_, + stored_row))) { LOG_WARN("insert row with das failed", K(ret)); } else { LOG_DEBUG("ins upd new_row", KPC(tablet_loc), "upd new_row", @@ -801,6 +819,8 @@ int ObTableInsertUpOp::do_insert_up() LOG_WARN("fail to load all row", K(ret)); } else if (OB_FAIL(post_all_dml_das_task(dml_rtctx_, false))) { LOG_WARN("fail to post all das task", K(ret)); + } else if (!check_is_duplicated() && OB_FAIL(ObDMLService::handle_after_row_processing_batch(&dml_modify_rows_))) { + LOG_WARN("try insert is not duplicated, failed to process foreign key handle", K(ret)); } else if (!check_is_duplicated()) { insert_rows_ += insert_rows; LOG_TRACE("try insert is not duplicated", K(ret), K(insert_rows_)); @@ -823,6 +843,8 @@ int ObTableInsertUpOp::do_insert_up() LOG_WARN("do insert rows post process failed", K(ret)); } else if (OB_FAIL(post_all_dml_das_task(dml_rtctx_, false))) { LOG_WARN("do insert rows post process failed", K(ret)); + } else if (OB_FAIL(ObDMLService::handle_after_row_processing_batch(&dml_modify_rows_))) { + LOG_WARN("try insert is duplicated, failed to process foreign key handle", K(ret)); } if (OB_SUCC(ret) && !is_iter_end) { @@ -999,7 +1021,6 @@ int ObTableInsertUpOp::prepare_final_insert_up_task() OZ(do_insert(constraint_value)); } } - return ret; } @@ -1073,6 +1094,7 @@ int ObTableInsertUpOp::do_update_with_ignore() ObUpdRtDef &upd_rtdef = insert_up_rtdef.upd_rtdef_; ObDASTabletLoc *old_tablet_loc = nullptr; ObDASTabletLoc *new_tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, (upd_ctdef), &upd_rtdef, ObDmlEventType::DE_UPDATING); if (MY_SPEC.insert_up_ctdefs_.count() > 1) { ret = OB_NOT_SUPPORTED; LOG_WARN("not support global index with ignore", K(ret)); @@ -1083,8 +1105,11 @@ int ObTableInsertUpOp::do_update_with_ignore() ret = OB_NOT_SUPPORTED; LOG_WARN("ignore not supported", K(ret), KPC(old_tablet_loc), KPC(new_tablet_loc)); LOG_USER_ERROR(OB_NOT_SUPPORTED, "Do update with ignore under inconsistent tablet loc"); - } else if (OB_FAIL(ObDMLService::update_row(*upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::update_row(*upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc, dml_rtctx_, + modify_row.old_row_, modify_row.new_row_, modify_row.full_row_))) { LOG_WARN("fail to insert update_row to das", K(ret)); + } else if (need_after_row_process(*upd_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } return ret; } @@ -1124,6 +1149,7 @@ int ObTableInsertUpOp::reset_das_env() LOG_WARN("close all das task failed", K(ret)); } else { dml_rtctx_.das_ref_.reuse(); + dml_modify_rows_.clear(); } // 因为第二次插入不需要fetch conflict result了,如果有conflict @@ -1158,6 +1184,7 @@ int ObTableInsertUpOp::reuse() LOG_WARN("fail to reuse conflict checker", K(ret)); } else { insert_up_row_store_.reset(); + dml_modify_rows_.clear(); } } diff --git a/src/sql/engine/dml/ob_table_insert_up_op.h b/src/sql/engine/dml/ob_table_insert_up_op.h index efa2b3131b..d501769cda 100644 --- a/src/sql/engine/dml/ob_table_insert_up_op.h +++ b/src/sql/engine/dml/ob_table_insert_up_op.h @@ -157,6 +157,10 @@ protected: ObInsRtDef &ins_rtdef, const ObDASTabletLoc *tablet_loc); + int insert_row_to_das(const ObInsCtDef &ins_ctdef, + ObInsRtDef &ins_rtdef, + const ObDASTabletLoc *tablet_loc, + ObDMLModifyRowNode &modify_row); int calc_update_tablet_loc(const ObUpdCtDef &upd_ctdef, ObUpdRtDef &upd_rtdef, diff --git a/src/sql/engine/dml/ob_table_lock_op.cpp b/src/sql/engine/dml/ob_table_lock_op.cpp index d2dca49cfe..986448bad8 100644 --- a/src/sql/engine/dml/ob_table_lock_op.cpp +++ b/src/sql/engine/dml/ob_table_lock_op.cpp @@ -212,10 +212,8 @@ int ObTableLockOp::inner_get_next_row() if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) { //DML operator reach iter end, //now submit the remaining rows in the DAS Write Buffer to the storage - if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { - LOG_WARN("execute all dml das task failed", K(ret)); - } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { - LOG_WARN("close all das task failed", K(ret)); + if (OB_FAIL(submit_all_dml_task())) { + LOG_WARN("failed to submit the remaining dml tasks", K(ret)); } //to post process the DML info after writing all data to the storage ret = write_rows_post_proc(ret); @@ -287,10 +285,8 @@ int ObTableLockOp::inner_get_next_batch(const int64_t max_row_cnt) if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) { //DML operator reach iter end, //now submit the remaining rows in the DAS Write Buffer to the storage - if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { - LOG_WARN("execute all dml das task failed", K(ret)); - } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { - LOG_WARN("close all das task failed", K(ret)); + if (OB_FAIL(submit_all_dml_task())) { + LOG_WARN("failed to submit the remaining dml tasks", K(ret)); } //to post process the DML info after writing all data to the storage ret = write_rows_post_proc(ret); diff --git a/src/sql/engine/dml/ob_table_merge_op.cpp b/src/sql/engine/dml/ob_table_merge_op.cpp index 5e859ae3cc..48e333b2a4 100644 --- a/src/sql/engine/dml/ob_table_merge_op.cpp +++ b/src/sql/engine/dml/ob_table_merge_op.cpp @@ -459,6 +459,8 @@ int ObTableMergeOp::update_row_das() ObDASTabletLoc *old_tablet_loc = nullptr; ObDASTabletLoc *new_tablet_loc = nullptr; ObUpdRtDef &upd_rtdef = merge_rtdefs_.at(i).upd_rtdef_; + ObDMLModifyRowNode modify_row(this, merge_ctdef->upd_ctdef_, &upd_rtdef, ObDmlEventType::DE_UPDATING); + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_ISNULL(merge_ctdef)) { // merge_ctdef can't be NULL ret = OB_ERR_UNEXPECTED; @@ -475,11 +477,11 @@ int ObTableMergeOp::update_row_das() break; } else if (OB_FAIL(calc_update_tablet_loc(*upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc))) { LOG_WARN("calc partition key failed", K(ret)); - } else if (OB_FAIL(TriggerHandle::do_handle_after_row( - *this, upd_ctdef->trig_ctdef_, upd_rtdef.trig_rtdef_, ObTriggerEvents::get_update_event()))) { - LOG_WARN("failed to handle after trigger", K(ret)); - } else if (OB_FAIL(ObDMLService::update_row(*upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::update_row(*upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc, dml_rtctx_, + modify_row.old_row_, modify_row.new_row_, modify_row.full_row_))) { LOG_WARN("insert row with das failed", K(ret)); + } else if (need_after_row_process(*upd_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } else { affected_rows_++; } @@ -497,6 +499,7 @@ int ObTableMergeOp::delete_row_das() bool is_skipped = false; ObDASTabletLoc *tablet_loc = nullptr; ObDelRtDef &del_rtdef = merge_rtdefs_.at(i).del_rtdef_; + ObDMLModifyRowNode modify_row(this, (merge_ctdef->del_ctdef_), &del_rtdef, ObDmlEventType::DE_DELETING); if (OB_ISNULL(merge_ctdef)) { // merge_ctdef can't be NULL ret = OB_ERR_UNEXPECTED; @@ -514,8 +517,10 @@ int ObTableMergeOp::delete_row_das() // 这里貌似不应该出现is_skipped == true的场景 } else if (OB_FAIL(calc_delete_tablet_loc(*del_ctdef, del_rtdef, tablet_loc))) { LOG_WARN("calc partition key failed", K(ret)); - } else if (OB_FAIL(ObDMLService::delete_row(*del_ctdef, del_rtdef, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::delete_row(*del_ctdef, del_rtdef, tablet_loc, dml_rtctx_, modify_row.old_row_))) { LOG_WARN("insert row with das failed", K(ret)); + } else if (need_after_row_process(*del_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } } @@ -543,6 +548,7 @@ int ObTableMergeOp::do_insert() ObInsCtDef *ins_ctdef = NULL; ObInsRtDef &ins_rtdef = merge_rtdefs_.at(i).ins_rtdef_; ObDASTabletLoc *tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, (merge_ctdef->ins_ctdef_), &ins_rtdef, ObDmlEventType::DE_INSERTING); if (OB_ISNULL(merge_ctdef)) { // merge_ctdef can't be NULL ret = OB_ERR_UNEXPECTED; @@ -567,14 +573,11 @@ int ObTableMergeOp::do_insert() tablet_loc->tablet_id_, eval_ctx_))) { LOG_WARN("set_heap_table_hidden_pk failed", K(ret), KPC(tablet_loc), KPC(ins_ctdef)); - } else if (OB_FAIL(ObDMLService::insert_row(*ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::insert_row(*ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_, modify_row.new_row_))) { LOG_WARN("insert row with das failed", K(ret)); // TODO(yikang): fix trigger related for heap table - } else if (ins_ctdef->is_primary_index_ && OB_FAIL(TriggerHandle::do_handle_after_row(*this, - ins_ctdef->trig_ctdef_, - ins_rtdef.trig_rtdef_, - ObTriggerEvents::get_insert_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); + } else if (need_after_row_process(*ins_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } } diff --git a/src/sql/engine/dml/ob_table_modify_op.cpp b/src/sql/engine/dml/ob_table_modify_op.cpp index 794c990317..9f7ffdf8ea 100644 --- a/src/sql/engine/dml/ob_table_modify_op.cpp +++ b/src/sql/engine/dml/ob_table_modify_op.cpp @@ -25,6 +25,7 @@ #include "lib/mysqlclient/ob_isql_client.h" #include "observer/ob_inner_sql_connection_pool.h" #include "lib/worker.h" +#include "share/ob_debug_sync.h" namespace oceanbase { @@ -153,11 +154,12 @@ int ForeignKeyHandle::check_exist(ObTableModifyOp &op, const ObExprPtrIArray &row, bool expect_zero) { + DEBUG_SYNC(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK); int ret = OB_SUCCESS; static const char *SELECT_FMT_MYSQL = - "select /*+ no_parallel */ 1 from `%.*s`.`%.*s` where %.*s limit 2 for update"; + "select /*+ no_parallel */ 1 from `%.*s`.`%.*s` where %.*s limit 2"; static const char *SELECT_FMT_ORACLE = - "select /*+ no_parallel */ 1 from \"%.*s\".\"%.*s\" where %.*s and rownum <= 2 for update"; + "select /*+ no_parallel */ 1 from \"%.*s\".\"%.*s\" where %.*s and rownum <= 2"; const char *select_fmt = lib::is_mysql_mode() ? SELECT_FMT_MYSQL : SELECT_FMT_ORACLE; ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR, OB_MALLOC_NORMAL_BLOCK_SIZE, @@ -246,7 +248,7 @@ int ForeignKeyHandle::check_exist(ObTableModifyOp &op, } else if (is_zero && !is_self_ref) { ret = OB_ERR_NO_REFERENCED_ROW; LOG_WARN("parent row is not exist", K(ret), K(fk_arg), K(row)); - } else if (!is_zero && (!is_self_ref || !is_affect_only_one)) { + } else if (!is_zero) { ret = OB_ERR_ROW_IS_REFERENCED; LOG_WARN("child row is exist", K(ret), K(fk_arg), K(row)); } @@ -524,7 +526,9 @@ ObTableModifyOp::ObTableModifyOp(ObExecContext &ctx, iter_end_(false), dml_rtctx_(eval_ctx_, ctx, *this), is_error_logging_(false), + execute_single_row_(false), err_log_rt_def_(), + dml_modify_rows_(ctx.get_allocator()), saved_session_(NULL) { obj_print_params_ = CREATE_OBJ_PRINT_PARAM(ctx_.get_my_session()); @@ -559,6 +563,8 @@ int ObTableModifyOp::inner_open() LOG_WARN("failed to open inner conn", K(ret)); } else if (OB_FAIL(calc_single_table_loc())) { LOG_WARN("calc single table loc failed", K(ret)); + } else if (OB_FAIL(check_need_exec_single_row())) { + LOG_WARN("failed to perform single row execution check", K(ret)); } else { init_das_dml_ctx(); } @@ -692,6 +698,7 @@ int ObTableModifyOp::inner_close() dml_rtctx_.das_ref_.reset(); } } + dml_modify_rows_.clear(); // Release the hash sets created at root ctx for delete distinct check if (OB_SUCC(ret) && get_exec_ctx().is_root_ctx()) { DASDelCtxList& del_ctx_list = get_exec_ctx().get_das_ctx().get_das_del_ctx_list(); @@ -736,6 +743,7 @@ int ObTableModifyOp::inner_rescan() } } if (OB_SUCC(ret)) { + dml_modify_rows_.clear(); if (OB_FAIL(calc_single_table_loc())) { LOG_WARN("calc single table loc failed", K(ret)); } @@ -743,6 +751,12 @@ int ObTableModifyOp::inner_rescan() return ret; } +int ObTableModifyOp::check_need_exec_single_row() { + int ret = OB_SUCCESS; + execute_single_row_ = false; + return ret; +} + int ObTableModifyOp::get_gi_task() { int ret = OB_SUCCESS; @@ -971,7 +985,6 @@ int ObTableModifyOp::check_stack() } return ret; } - OperatorOpenOrder ObTableModifyOp::get_operator_open_order() const { OperatorOpenOrder open_order = OPEN_CHILDREN_FIRST; @@ -1058,11 +1071,19 @@ int ObTableModifyOp::submit_all_dml_task() { int ret = OB_SUCCESS; if (dml_rtctx_.das_ref_.has_task()) { - if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { + if (dml_rtctx_.need_pick_del_task_first() && + OB_FAIL(dml_rtctx_.das_ref_.pick_del_task_to_first())) { + LOG_WARN("fail to pick delete das task to first", K(ret)); + } else if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { LOG_WARN("execute all dml das task failed", K(ret)); } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { LOG_WARN("close all das task failed", K(ret)); + } else if (!execute_single_row_ && OB_FAIL(ObDMLService::handle_after_row_processing_batch(&get_dml_modify_row_list()))) { + LOG_WARN("perform batch foreign key constraints and after row trigger failed", K(ret)); + } else if (execute_single_row_ && OB_FAIL(ObDMLService::handle_after_row_processing(&get_dml_modify_row_list()))) { + LOG_WARN("perform single row foreign key constraints and after row trigger failed", K(ret)); } else { + dml_modify_rows_.clear(); dml_rtctx_.reuse(); } } @@ -1075,9 +1096,9 @@ int ObTableModifyOp::submit_all_dml_task() int ObTableModifyOp::discharge_das_write_buffer() { int ret = OB_SUCCESS; - if (dml_rtctx_.das_ref_.get_das_mem_used() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE) { - LOG_INFO("DASWriteBuffer full, now to write storage", - "buffer memory", dml_rtctx_.das_ref_.get_das_mem_used()); + if (dml_rtctx_.das_ref_.get_das_mem_used() >= das::OB_DAS_MAX_TOTAL_PACKET_SIZE || execute_single_row_) { + LOG_INFO("DASWriteBuffer full or need single row execution, now to write storage", + "buffer memory", dml_rtctx_.das_ref_.get_das_alloc().used(), K(execute_single_row_)); ret = submit_all_dml_task(); } return ret; @@ -1138,13 +1159,8 @@ int ObTableModifyOp::inner_get_next_row() if (OB_SUCC(ret) && iter_end_ && dml_rtctx_.das_ref_.has_task()) { //DML operator reach iter end, //now submit the remaining rows in the DAS Write Buffer to the storage - if (dml_rtctx_.need_pick_del_task_first() && - OB_FAIL(dml_rtctx_.das_ref_.pick_del_task_to_first())) { - LOG_WARN("pick delete das task to first failed", K(ret)); - } else if (OB_FAIL(dml_rtctx_.das_ref_.execute_all_task())) { - LOG_WARN("execute all dml das task failed", K(ret)); - } else if (OB_FAIL(dml_rtctx_.das_ref_.close_all_task())) { - LOG_WARN("close all das task failed", K(ret)); + if (OB_FAIL(submit_all_dml_task())) { + LOG_WARN("failed to submit the remaining dml tasks", K(ret)); } } //to post process the DML info after writing all data to the storage or returning one row diff --git a/src/sql/engine/dml/ob_table_modify_op.h b/src/sql/engine/dml/ob_table_modify_op.h index 3f06aac42f..3dffdb3449 100644 --- a/src/sql/engine/dml/ob_table_modify_op.h +++ b/src/sql/engine/dml/ob_table_modify_op.h @@ -22,6 +22,7 @@ namespace oceanbase { namespace sql { + class ForeignKeyHandle { public: @@ -34,6 +35,7 @@ public: static int do_handle(ObTableModifyOp &op, const ObDMLBaseCtDef &dml_ctdef, ObDMLBaseRtDef &dml_rtdef); + private: static int value_changed(ObTableModifyOp &op, const common::ObIArray &columns, @@ -198,6 +200,13 @@ public: bool is_fk_nested_session() { return ObSQLUtils::is_fk_nested_sql(&ctx_); } void set_foreign_key_checks() { foreign_key_checks_ = true; } bool need_foreign_key_checks() { return foreign_key_checks_; } + bool has_before_row_trigger(const ObDMLBaseCtDef &dml_ctdef) { return dml_ctdef.is_primary_index_ && dml_ctdef.trig_ctdef_.all_tm_points_.has_before_row(); } + bool has_after_row_trigger(const ObDMLBaseCtDef &dml_ctdef) { return dml_ctdef.is_primary_index_ && dml_ctdef.trig_ctdef_.all_tm_points_.has_after_row(); } + bool need_foreign_key_check(const ObDMLBaseCtDef &dml_ctdef) { return dml_ctdef.is_primary_index_ && dml_ctdef.fk_args_.count() > 0; } + bool need_after_row_process(const ObDMLBaseCtDef &dml_ctdef) { return need_foreign_key_check(dml_ctdef) || has_after_row_trigger(dml_ctdef); } + void set_execute_single_row() { execute_single_row_ = true; } + void unset_execute_single_row() { execute_single_row_ = false; } + bool get_execute_single_row() const { return execute_single_row_; } bool is_fk_root_session(); const ObObjPrintParams &get_obj_print_params() { return obj_print_params_; } int init_foreign_key_operation(); @@ -207,6 +216,8 @@ public: void clear_dml_evaluated_flag(); void clear_dml_evaluated_flag(int64_t parent_cnt, ObExpr **parent_exprs); void clear_dml_evaluated_flag(ObExpr *clear_expr); + + ObDMLModifyRowsList& get_dml_modify_row_list() { return dml_modify_rows_;} int submit_all_dml_task(); protected: OperatorOpenOrder get_operator_open_order() const; @@ -221,6 +232,7 @@ protected: virtual int inner_rescan() override; virtual int inner_get_next_row() override; + virtual int check_need_exec_single_row(); int get_next_row_from_child(); //Override this interface to complete the write semantics of the DML operator, //and write a row to the DAS Write Buffer according to the specific DML behavior @@ -251,8 +263,10 @@ public: bool iter_end_; ObDMLRtCtx dml_rtctx_; bool is_error_logging_; + bool execute_single_row_; ObErrLogRtDef err_log_rt_def_; ObSEArray trigger_clear_exprs_; + ObDMLModifyRowsList dml_modify_rows_; private: ObSQLSessionInfo::StmtSavedValue *saved_session_; char saved_session_buf_[sizeof(ObSQLSessionInfo::StmtSavedValue)] __attribute__((aligned (16)));; diff --git a/src/sql/engine/dml/ob_table_replace_op.cpp b/src/sql/engine/dml/ob_table_replace_op.cpp index 76c6063912..e6378735f3 100644 --- a/src/sql/engine/dml/ob_table_replace_op.cpp +++ b/src/sql/engine/dml/ob_table_replace_op.cpp @@ -287,6 +287,7 @@ int ObTableReplaceOp::insert_row_to_das(bool need_do_trigger) ObReplaceRtDef &replace_rtdef = replace_rtdefs_.at(i); ObInsRtDef &ins_rtdef = replace_rtdef.ins_rtdef_; ObDASTabletLoc *tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, &ins_ctdef, &ins_rtdef, ObDmlEventType::DE_INSERTING); ++ins_rtdef.cur_row_num_; if (need_do_trigger && OB_FAIL(ObDMLService::init_heap_table_pk_for_ins(ins_ctdef, eval_ctx_))) { @@ -302,15 +303,11 @@ int ObTableReplaceOp::insert_row_to_das(bool need_do_trigger) } else if (need_do_trigger && OB_FAIL(ObDMLService::set_heap_table_hidden_pk(ins_ctdef, tablet_loc->tablet_id_, eval_ctx_))) { LOG_WARN("set_heap_table_hidden_pk failed", K(ret), KPC(tablet_loc), K(ins_ctdef)); - } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::insert_row(ins_ctdef, ins_rtdef, tablet_loc, dml_rtctx_, modify_row.new_row_))) { LOG_WARN("insert row with das failed", K(ret)); // TODO(yikang): fix trigger related for heap table - } else if (need_do_trigger && ins_ctdef.is_primary_index_ - && OB_FAIL(TriggerHandle::do_handle_after_row(*this, - ins_ctdef.trig_ctdef_, - ins_rtdef.trig_rtdef_, - ObTriggerEvents::get_insert_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); + } else if (need_do_trigger && need_after_row_process(ins_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } else { LOG_DEBUG("insert one row", KPC(tablet_loc), "ins row", ROWEXPR2STR(eval_ctx_, ins_ctdef.new_row_)); @@ -330,6 +327,7 @@ int ObTableReplaceOp::delete_row_to_das(bool need_do_trigger) const ObDelCtDef &del_ctdef = *(replace_ctdef.del_ctdef_); ObReplaceRtDef &replace_rtdef = replace_rtdefs_.at(i); ObDelRtDef &del_rtdef = replace_rtdef.del_rtdef_; + ObChunkDatumStore::StoredRow *stored_row = nullptr; if (need_do_trigger && OB_FAIL(ObDMLService::process_delete_row(del_ctdef, del_rtdef, is_skipped, *this))) { LOG_WARN("process delete row failed", K(ret)); @@ -339,7 +337,7 @@ int ObTableReplaceOp::delete_row_to_das(bool need_do_trigger) break; } else if (OB_FAIL(calc_delete_tablet_loc(del_ctdef, del_rtdef, tablet_loc))) { LOG_WARN("calc partition key failed", K(ret)); - } else if (OB_FAIL(ObDMLService::delete_row(del_ctdef, del_rtdef, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::delete_row(del_ctdef, del_rtdef, tablet_loc, dml_rtctx_, stored_row))) { LOG_WARN("insert row with das failed", K(ret)); } else { LOG_DEBUG("delete one row", KPC(tablet_loc), "del row", @@ -441,6 +439,8 @@ int ObTableReplaceOp::do_replace_into() LOG_WARN("fail to load all row", K(ret)); } else if (OB_FAIL(post_all_dml_das_task())) { LOG_WARN("fail to post all das task", K(ret)); + } else if (!check_is_duplicated() && OB_FAIL(ObDMLService::handle_after_row_processing_batch(&dml_modify_rows_))) { + LOG_WARN("try insert is not duplicated, failed to process foreign key handle", K(ret)); } else if (!check_is_duplicated()) { LOG_DEBUG("try insert is not duplicated", K(ret)); } else if (OB_FAIL(fetch_conflict_rowkey())) { @@ -460,6 +460,8 @@ int ObTableReplaceOp::do_replace_into() LOG_WARN("fail to prepare final das task", K(ret)); } else if (OB_FAIL(post_all_dml_das_task())) { LOG_WARN("do insert rows post process failed", K(ret)); + } else if (OB_FAIL(ObDMLService::handle_after_row_processing_batch(&dml_modify_rows_))) { + LOG_WARN("try insert is duplicated, failed to process foreign key handle", K(ret)); } if (OB_SUCC(ret) && !is_iter_end) { @@ -522,6 +524,7 @@ int ObTableReplaceOp::replace_conflict_row_cache() for (int64_t i = 0; OB_SUCC(ret) && i < constraint_values.count(); ++i) { //delete duplicated row const ObChunkDatumStore::StoredRow *delete_row = constraint_values.at(i).current_datum_row_; + ObDMLModifyRowNode modify_row(this, &del_ctdef, &del_rtdef, ObDmlEventType::DE_DELETING); bool same_row = false; if (OB_ISNULL(delete_row)) { ret = OB_ERR_UNEXPECTED; @@ -538,6 +541,12 @@ int ObTableReplaceOp::replace_conflict_row_cache() LOG_WARN("check value failed", K(ret), KPC(replace_row), KPC(delete_row)); } } + if (OB_SUCC(ret)) { + modify_row.old_row_ = const_cast(delete_row); + if (need_after_row_process(del_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); + } + } if (OB_SUCC(ret) && !same_row) { delete_rows_++; } @@ -549,12 +558,6 @@ int ObTableReplaceOp::replace_conflict_row_cache() LOG_WARN("flush replace_row to exprs failed", K(ret), KPC(replace_row)); } else if (OB_FAIL(ObDMLService::process_insert_row(ins_ctdef, ins_rtdef, *this, is_skipped))) { LOG_WARN("convert exprs to stored_row failed", K(ret), KPC(insert_new_row)); - } else if (ins_ctdef.is_primary_index_ && - OB_FAIL(TriggerHandle::do_handle_after_row(*this, - ins_ctdef.trig_ctdef_, - ins_rtdef.trig_rtdef_, - ObTriggerEvents::get_insert_event()))) { - LOG_WARN("failed to handle before trigger", K(ret)); } else if (OB_UNLIKELY(is_skipped)) { continue; } else if (OB_FAIL(conflict_checker_.convert_exprs_to_stored_row(get_primary_table_new_row(), @@ -563,6 +566,13 @@ int ObTableReplaceOp::replace_conflict_row_cache() } else if (OB_FAIL(conflict_checker_.insert_new_row(insert_new_row, ObNewRowSource::FROM_INSERT))) { LOG_WARN("insert new to conflict_checker failed", K(ret), KPC(insert_new_row)); } + if (OB_SUCC(ret)) { + ObDMLModifyRowNode modify_row(this, &ins_ctdef, &ins_rtdef, ObDmlEventType::DE_INSERTING); + modify_row.new_row_ = insert_new_row; + if (need_after_row_process(del_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); + } + } } // while row store end ret = OB_ITER_END == ret ? OB_SUCCESS : ret; return ret; @@ -716,6 +726,7 @@ int ObTableReplaceOp::reset_das_env() LOG_WARN("close all das task failed", K(ret)); } else { dml_rtctx_.das_ref_.reuse(); + dml_modify_rows_.clear(); } // 因为第二次插入不需要fetch conflict result了,如果有conflict @@ -743,6 +754,7 @@ int ObTableReplaceOp::reuse() if (OB_FAIL(conflict_checker_.reuse())) { LOG_WARN("fail to reuse conflict checker", K(ret)); } else { + dml_modify_rows_.clear(); replace_row_store_.reset(); } } diff --git a/src/sql/engine/dml/ob_table_update_op.cpp b/src/sql/engine/dml/ob_table_update_op.cpp index b79be6dafe..8218044990 100644 --- a/src/sql/engine/dml/ob_table_update_op.cpp +++ b/src/sql/engine/dml/ob_table_update_op.cpp @@ -327,6 +327,7 @@ OB_INLINE int ObTableUpdateOp::update_row_to_das() ObUpdRtDef &upd_rtdef = rtdefs.at(j); ObDASTabletLoc *old_tablet_loc = nullptr; ObDASTabletLoc *new_tablet_loc = nullptr; + ObDMLModifyRowNode modify_row(this, &upd_ctdef, &upd_rtdef, ObDmlEventType::DE_UPDATING); bool is_skipped = false; if (!MY_SPEC.upd_ctdefs_.at(0).at(0)->has_instead_of_trigger_) { ++upd_rtdef.cur_row_num_; @@ -339,11 +340,11 @@ OB_INLINE int ObTableUpdateOp::update_row_to_das() break; } else if (OB_FAIL(calc_tablet_loc(upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc))) { LOG_WARN("calc partition key failed", K(ret)); - } else if (OB_FAIL(TriggerHandle::do_handle_after_row( - *this, upd_ctdef.trig_ctdef_, upd_rtdef.trig_rtdef_, ObTriggerEvents::get_update_event()))) { - LOG_WARN("failed to handle after trigger", K(ret)); - } else if (OB_FAIL(ObDMLService::update_row(upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::update_row(upd_ctdef, upd_rtdef, old_tablet_loc, new_tablet_loc, dml_rtctx_, + modify_row.old_row_, modify_row.new_row_, modify_row.full_row_))) { LOG_WARN("insert row with das failed", K(ret)); + } else if (need_after_row_process(upd_ctdef) && OB_FAIL(dml_modify_rows_.push_back(modify_row))) { + LOG_WARN("failed to push dml modify row to modified row list", K(ret)); } else { ++upd_rtdef.found_rows_; } diff --git a/src/sql/engine/dml/ob_table_update_op.h b/src/sql/engine/dml/ob_table_update_op.h index 06e94db6ac..57cf3e1792 100644 --- a/src/sql/engine/dml/ob_table_update_op.h +++ b/src/sql/engine/dml/ob_table_update_op.h @@ -108,6 +108,7 @@ protected: int check_update_affected_row(); virtual int write_row_to_das_buffer() override; virtual int write_rows_post_proc(int last_errno) override; + protected: UpdRtDef2DArray upd_rtdefs_; //see the comment of UpdCtDef2DArray common::ObArrayWrap ins_rtdefs_; diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp index 350df9da9d..0ee32fb4c2 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_delete_op.cpp @@ -202,6 +202,7 @@ int ObPxMultiPartDeleteOp::write_rows(ObExecContext &ctx, } else { while (OB_SUCC(ret)) { clear_evaluated_flag(); + ObChunkDatumStore::StoredRow* stored_row = nullptr; if (OB_FAIL(try_check_status())) { LOG_WARN("check status failed", K(ret)); } else if (OB_FAIL(dml_row_iter.get_next_row(child_->get_spec().output_))) { @@ -210,8 +211,10 @@ int ObPxMultiPartDeleteOp::write_rows(ObExecContext &ctx, } else { iter_end_ = true; } - } else if (OB_FAIL(ObDMLService::delete_row(MY_SPEC.del_ctdef_, del_rtdef_, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::delete_row(MY_SPEC.del_ctdef_, del_rtdef_, tablet_loc, dml_rtctx_, stored_row))) { LOG_WARN("delete row to das failed", K(ret)); + } else if (OB_FAIL(discharge_das_write_buffer())) { + LOG_WARN("failed to submit all dml task when the buffer of das op is full", K(ret)); } } diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp index 3f1cec18cd..337953b923 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_insert_op.cpp @@ -176,6 +176,7 @@ int ObPxMultiPartInsertOp::write_rows(ObExecContext &ctx, LOG_WARN("get physical plan context failed", K(ret)); } else { while (OB_SUCC(ret)) { + ObChunkDatumStore::StoredRow* stored_row = nullptr; clear_evaluated_flag(); if (OB_FAIL(try_check_status())) { LOG_WARN("check status failed", K(ret)); @@ -185,8 +186,10 @@ int ObPxMultiPartInsertOp::write_rows(ObExecContext &ctx, } else { iter_end_ = true; } - } else if (OB_FAIL(ObDMLService::insert_row(MY_SPEC.ins_ctdef_, ins_rtdef_, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::insert_row(MY_SPEC.ins_ctdef_, ins_rtdef_, tablet_loc, dml_rtctx_, stored_row))) { LOG_WARN("insert row to das failed", K(ret)); + } else if (OB_FAIL(discharge_das_write_buffer())) { + LOG_WARN("failed to submit all dml task when the buffer of das op is full", K(ret)); } } diff --git a/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp b/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp index a19a470795..8e2ca85daa 100644 --- a/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp +++ b/src/sql/engine/pdml/static/ob_px_multi_part_update_op.cpp @@ -100,12 +100,13 @@ int ObPxMultiPartUpdateOp::update_row_to_das(const ObDASTabletLoc *tablet_loc) { int ret = OB_SUCCESS; bool is_skipped = false; + ObChunkDatumStore::StoredRow* stored_row = nullptr; ++upd_rtdef_.cur_row_num_; if (OB_FAIL(ObDMLService::process_update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, is_skipped, *this))) { LOG_WARN("process update row failed", K(ret)); } else if (is_skipped) { //do nothing - } else if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_))) { + } else if (OB_FAIL(ObDMLService::update_row(MY_SPEC.upd_ctdef_, upd_rtdef_, tablet_loc, tablet_loc, dml_rtctx_, stored_row, stored_row, stored_row))) { LOG_WARN("insert row with das failed", K(ret)); } else { ++upd_rtdef_.found_rows_; @@ -232,6 +233,8 @@ int ObPxMultiPartUpdateOp::write_rows(ObExecContext &ctx, } } else if (OB_FAIL(update_row_to_das(tablet_loc))) { LOG_WARN("update row to das failed", K(ret)); + } else if (OB_FAIL(discharge_das_write_buffer())) { + LOG_WARN("failed to submit all dml task when the buffer of das op is full", K(ret)); } } diff --git a/src/sql/engine/table/ob_table_scan_op.cpp b/src/sql/engine/table/ob_table_scan_op.cpp index b150142805..88b7585723 100644 --- a/src/sql/engine/table/ob_table_scan_op.cpp +++ b/src/sql/engine/table/ob_table_scan_op.cpp @@ -869,6 +869,9 @@ OB_INLINE int ObTableScanOp::init_das_scan_rtdef(const ObDASScanCtDef &das_ctdef das_rtdef.tx_lock_timeout_ = my_session->get_trx_lock_timeout(); das_rtdef.scan_flag_ = MY_CTDEF.scan_flags_; das_rtdef.scan_flag_.is_show_seed_ = plan_ctx->get_show_seed(); + if(is_foreign_check_nested_session() && stmt::T_SELECT == ctx_.get_sql_ctx()->stmt_type_) { + das_rtdef.is_for_foreign_check_ = true; + } if (MY_SPEC.batch_scan_flag_ || is_lookup) { das_rtdef.scan_flag_.scan_order_ = ObQueryFlag::KeepOrder; } diff --git a/src/sql/engine/table/ob_table_scan_op.h b/src/sql/engine/table/ob_table_scan_op.h index 1790ca7c8e..52ceb3fb60 100644 --- a/src/sql/engine/table/ob_table_scan_op.h +++ b/src/sql/engine/table/ob_table_scan_op.h @@ -473,6 +473,8 @@ protected: eval_ctx_, pd_expr_spec.max_batch_size_); } } + bool is_foreign_check_nested_session() { return ObSQLUtils::is_fk_nested_sql(&ctx_);} + private: const ObTableScanSpec& get_tsc_spec() {return MY_SPEC;} const ObTableScanCtDef& get_tsc_ctdef() {return MY_SPEC.tsc_ctdef_;} diff --git a/src/storage/CMakeLists.txt b/src/storage/CMakeLists.txt index a285f49e57..056eba0e9b 100644 --- a/src/storage/CMakeLists.txt +++ b/src/storage/CMakeLists.txt @@ -472,6 +472,7 @@ ob_set_subtarget(ob_storage memtable memtable/ob_multi_source_data.cpp memtable/ob_redo_log_generator.cpp memtable/ob_row_compactor.cpp + memtable/ob_row_conflict_handler.cpp memtable/ob_concurrent_control.cpp ) diff --git a/src/storage/access/ob_table_access_param.cpp b/src/storage/access/ob_table_access_param.cpp index 7e98a96e11..3e5184ce06 100644 --- a/src/storage/access/ob_table_access_param.cpp +++ b/src/storage/access/ob_table_access_param.cpp @@ -39,6 +39,7 @@ ObTableIterParam::ObTableIterParam() is_same_schema_column_(false), vectorized_enabled_(false), has_virtual_columns_(false), + is_for_foreign_check_(false), ss_rowkey_prefix_cnt_(0), pd_storage_flag_(0) { @@ -66,6 +67,7 @@ void ObTableIterParam::reset() ss_rowkey_prefix_cnt_ = 0; vectorized_enabled_ = false; has_virtual_columns_ = false; + is_for_foreign_check_ = false; } bool ObTableIterParam::is_valid() const @@ -192,6 +194,7 @@ int ObTableAccessParam::init( iter_param_.out_cols_project_ = &table_param.get_output_projector(); iter_param_.agg_cols_project_ = &table_param.get_aggregate_projector(); iter_param_.need_scn_ = scan_param.need_scn_; + iter_param_.is_for_foreign_check_ = scan_param.is_for_foreign_check_; padding_cols_ = &table_param.get_pad_col_projector(); projector_size_ = scan_param.projector_size_; diff --git a/src/storage/access/ob_table_access_param.h b/src/storage/access/ob_table_access_param.h index 9ea2148d85..8db65fdb66 100644 --- a/src/storage/access/ob_table_access_param.h +++ b/src/storage/access/ob_table_access_param.h @@ -158,6 +158,7 @@ public: bool is_same_schema_column_; bool vectorized_enabled_; bool has_virtual_columns_; + bool is_for_foreign_check_; int64_t ss_rowkey_prefix_cnt_; union { struct { diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp index 8260fba39d..e0c8635adb 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.cpp +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.cpp @@ -17,6 +17,7 @@ #include "storage/access/ob_block_row_store.h" #include "storage/access/ob_block_batched_row_store.h" #include "storage/access/ob_index_sstable_estimator.h" +#include "storage/memtable/ob_row_conflict_handler.h" #include "storage/blocksstable/ob_index_block_row_scanner.h" #include "storage/tx_table/ob_tx_table.h" #include "storage/tx/ob_tx_data_functor.h" @@ -1025,6 +1026,8 @@ int ObMultiVersionMicroBlockRowScanner::inner_inner_get_next_row( bool read_uncommitted_row = false; bool is_ghost_row_flag = false; const int64_t snapshot_version = context_->trans_version_range_.snapshot_version_; + memtable::ObMvccAccessCtx &acc_ctx = context_->store_ctx_->mvcc_acc_ctx_; + if (OB_UNLIKELY(context_->query_flag_.is_ignore_trans_stat())) { version_fit = true; } else if (OB_FAIL(reader_->get_multi_version_info( @@ -1041,18 +1044,47 @@ int ObMultiVersionMicroBlockRowScanner::inner_inner_get_next_row( } else if (FALSE_IT(flag = row_header->get_row_multi_version_flag())) { } else if (flag.is_uncommitted_row()) { have_uncommited_row = true; // TODO @lvling check transaction status instead - auto &acc_ctx = context_->store_ctx_->mvcc_acc_ctx_; transaction::ObLockForReadArg lock_for_read_arg(acc_ctx, - row_header->get_trans_id(), + transaction::ObTransID(row_header->get_trans_id()), sql_sequence, context_->query_flag_.read_latest_); - if (OB_FAIL(lock_for_read(lock_for_read_arg, + if (OB_FAIL(lock_for_read(lock_for_read_arg, can_read, trans_version, is_determined_state))) { - STORAGE_LOG(WARN, "fail to check transaction status", K(ret), KPC(row_header), K_(macro_id)); + STORAGE_LOG(WARN, "fail to check transaction status", K(ret), KPC(row_header), K_(macro_id)); + } + } + + if (OB_SUCC(ret)) { + ObStoreRowLockState lock_state; + if (param_->is_for_foreign_check_ && + OB_FAIL(ObRowConflictHandler::check_foreign_key_constraint_for_sstable( + acc_ctx.get_tx_table_guard(), + acc_ctx.get_tx_id(), + transaction::ObTransID(row_header->get_trans_id()), + sql_sequence, + trans_version, + snapshot_version, + lock_state))) { + if (OB_TRY_LOCK_ROW_CONFLICT == ret) { + int tmp_ret = OB_SUCCESS; + ObStoreRowkey store_rowkey; + ObDatumRowkeyHelper rowkey_helper; + if (OB_TMP_FAIL(get_store_rowkey(store_rowkey, rowkey_helper))) { + LOG_WARN("get store rowkey fail", K(tmp_ret)); + } else { + ObRowConflictHandler::post_row_read_conflict( + acc_ctx, + store_rowkey, + lock_state, + context_->tablet_id_, + context_->ls_id_, + 0, 0 /* these two params get from mvcc_row, and for statistics, so we ignore them */); + } } + } } if (OB_FAIL(ret)) { @@ -1290,6 +1322,31 @@ int ObMultiVersionMicroBlockRowScanner::lock_for_read( return ret; } +int ObMultiVersionMicroBlockRowScanner::get_store_rowkey(ObStoreRowkey &store_rowkey, + ObDatumRowkeyHelper &rowkey_helper) +{ + int ret = OB_SUCCESS; + ObDatumRowkey datum_rowkey; + if (IS_NOT_INIT) { + ret = OB_NOT_INIT; + LOG_WARN("not init", K(ret)); + } else if (OB_FAIL(row_.reserve(read_info_->get_request_count()))) { + LOG_WARN("Fail to reserve datum row", K(ret), K_(row)); + } else if (OB_FAIL(end_of_block())) { + if (OB_UNLIKELY(OB_ITER_END != ret)) { + LOG_WARN("fail to judge end of block or not", K(ret)); + } + } else if (OB_FAIL(reader_->get_row(current_, row_))) { + LOG_WARN("micro block reader fail to get block_row", K(ret), K(current_)); + } else if (OB_FAIL(datum_rowkey.assign(row_.storage_datums_, read_info_->get_schema_rowkey_count()))) { + LOG_WARN("assign datum_rowkey fail", K(ret), K(row_), KPC(read_info_)); + } else if (OB_FAIL(rowkey_helper.convert_store_rowkey(datum_rowkey, read_info_->get_columns_desc(), store_rowkey))) { + LOG_WARN("convert datumn_rowkey to store_rowkey fail", K(ret), KPC(read_info_), K(datum_rowkey)); + } + + return ret; +} + ////////////////////////////// ObMultiVersionMicroBlockMinorMergeRowScannerV2 ////////////////////////////// void ObMultiVersionMicroBlockMinorMergeRowScanner::reuse() { diff --git a/src/storage/blocksstable/ob_micro_block_row_scanner.h b/src/storage/blocksstable/ob_micro_block_row_scanner.h index 9a0ac4ac99..26ac502a57 100644 --- a/src/storage/blocksstable/ob_micro_block_row_scanner.h +++ b/src/storage/blocksstable/ob_micro_block_row_scanner.h @@ -230,6 +230,10 @@ private: bool &can_read, int64_t &trans_version, bool &is_determined_state); + // The store_rowkey is a decoration of the ObObj pointer, + // and it will be destroyed when the life cycle of the rowkey_helper is end. + // So we have to send it into the function to avoid this situation. + int get_store_rowkey(ObStoreRowkey &store_rowkey, ObDatumRowkeyHelper &rowkey_helper); private: ObDatumRow prev_micro_row_; storage::ObNopPos nop_pos_; diff --git a/src/storage/memtable/mvcc/ob_mvcc_acc_ctx.h b/src/storage/memtable/mvcc/ob_mvcc_acc_ctx.h index 36636bd3df..f9fa72e192 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_acc_ctx.h +++ b/src/storage/memtable/mvcc/ob_mvcc_acc_ctx.h @@ -50,7 +50,8 @@ public: mem_ctx_(NULL), tx_scn_(-1), write_flag_(), - handle_start_time_(OB_INVALID_TIMESTAMP) + handle_start_time_(OB_INVALID_TIMESTAMP), + lock_wait_start_ts_(0) {} ~ObMvccAccessCtx() { type_ = T::INVL; @@ -186,6 +187,12 @@ public: ObMemtableCtx *get_mem_ctx() const { return mem_ctx_; } + transaction::ObTxDesc *get_tx_desc() const { + return tx_desc_; + } + int64_t get_lock_wait_start_ts() const { return lock_wait_start_ts_; } + void set_lock_wait_start_ts(const int64_t lock_wait_start_ts) + { lock_wait_start_ts_ = lock_wait_start_ts; } bool is_read() const { return type_ == T::STRONG_READ || type_ == T::WEAK_READ; } bool is_weak_read() const { return type_ == T::WEAK_READ; } bool is_write() const { return type_ == T::WRITE; } @@ -219,7 +226,9 @@ public: KP_(tx_ctx), KP_(mem_ctx), K_(tx_scn), - K_(write_flag)); + K_(write_flag), + K_(handle_start_time), + K_(lock_wait_start_ts)); private: void warn_tx_ctx_leaky_(); public: // NOTE: those field should only be accessed by txn relative routine @@ -250,6 +259,8 @@ public: // NOTE: those field should only be accessed by txn relative routine // this was used for runtime mertic int64_t handle_start_time_; +protected: + int64_t lock_wait_start_ts_; }; } // memtable } // oceanbase diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp index 2f609a6987..da58fbdfa4 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.cpp @@ -325,6 +325,23 @@ void ObMvccValueIterator::move_to_next_node_() } } +int ObMvccValueIterator::check_row_locked(ObStoreRowLockState &lock_state) +{ + int ret = OB_SUCCESS; + if (IS_NOT_INIT) { + TRANS_LOG(WARN, "not init", KP(this)); + ret = OB_NOT_INIT; + } else if (OB_ISNULL(value_)) { + ret = OB_SUCCESS; + TRANS_LOG(WARN, "get value iter but mvcc row in it is null", K(ret)); + } else if (OB_FAIL(value_->check_row_locked(*ctx_, lock_state))){ + TRANS_LOG(WARN, "check row locked fail", K(ret), KPC(value_), KPC(ctx_), K(lock_state)); + } else { + lock_state.mvcc_row_ = value_; + } + return ret; +} + //////////////////////////////////////////////////////////////////////////////////////////////////// ObMvccRowIterator::ObMvccRowIterator() diff --git a/src/storage/memtable/mvcc/ob_mvcc_iterator.h b/src/storage/memtable/mvcc/ob_mvcc_iterator.h index 5c72a39f02..af5d3b11d7 100644 --- a/src/storage/memtable/mvcc/ob_mvcc_iterator.h +++ b/src/storage/memtable/mvcc/ob_mvcc_iterator.h @@ -21,6 +21,10 @@ namespace oceanbase { +namespace storage +{ +class ObStoreRowLockState; +} namespace memtable { @@ -128,6 +132,11 @@ public: version_iter_ = NULL; last_trans_version_ = share::SCN::max_scn(); } + int check_row_locked(storage::ObStoreRowLockState &lock_state); + const transaction::ObTransID get_trans_id() const { return ctx_->get_tx_id(); } + share::SCN get_snapshot_version() const { return ctx_->get_snapshot_version(); } + ObMvccAccessCtx *get_mvcc_acc_ctx() { return ctx_; } + const ObMvccAccessCtx *get_mvcc_acc_ctx() const { return ctx_; } const ObMvccRow *get_mvcc_row() const { return value_; } const ObMvccTransNode *get_trans_node() const { return version_iter_; } private: diff --git a/src/storage/memtable/ob_memtable.cpp b/src/storage/memtable/ob_memtable.cpp index 50b97e8863..7ceeae0449 100644 --- a/src/storage/memtable/ob_memtable.cpp +++ b/src/storage/memtable/ob_memtable.cpp @@ -28,6 +28,7 @@ #include "storage/memtable/ob_memtable_util.h" #include "storage/memtable/ob_memtable_context.h" #include "storage/memtable/ob_lock_wait_mgr.h" +#include "storage/memtable/ob_row_conflict_handler.h" #include "storage/memtable/ob_concurrent_control.h" #include "storage/compaction/ob_tablet_merge_task.h" #include "storage/compaction/ob_schedule_dag_func.h" @@ -785,6 +786,7 @@ int ObMemtable::get( TRANS_LOG(WARN, "Unexpected null read info", K(ret), K(param), K(context.use_fuse_row_cache_)); } else { const ObColDescIArray &out_cols = read_info->get_columns_desc(); + ObStoreRowLockState lock_state; if (OB_FAIL(parameter_mtk.encode(out_cols, &rowkey.get_store_rowkey()))) { TRANS_LOG(WARN, "mtk encode fail", "ret", ret); } else if (OB_FAIL(mvcc_engine_.get(context.store_ctx_->mvcc_acc_ctx_, @@ -794,6 +796,18 @@ int ObMemtable::get( &returned_mtk, value_iter))) { TRANS_LOG(WARN, "fail to do mvcc engine get", K(ret)); + } else if (param.is_for_foreign_check_ && + OB_FAIL(ObRowConflictHandler::check_foreign_key_constraint_for_memtable(&value_iter, lock_state))) { + if (OB_TRY_LOCK_ROW_CONFLICT == ret) { + ObRowConflictHandler::post_row_read_conflict( + *value_iter.get_mvcc_acc_ctx(), + *parameter_mtk.get_rowkey(), + lock_state, + key_.tablet_id_, + freezer_->get_ls_id(), + value_iter.get_mvcc_row()->get_last_compact_cnt(), + value_iter.get_mvcc_row()->get_total_trans_node_cnt()); + } } else { if (OB_UNLIKELY(!row.is_valid())) { if (OB_FAIL(row.init(*context.stmt_allocator_, out_cols.count()))) { diff --git a/src/storage/memtable/ob_memtable_iterator.cpp b/src/storage/memtable/ob_memtable_iterator.cpp index e4687fb8cd..8c9811111c 100644 --- a/src/storage/memtable/ob_memtable_iterator.cpp +++ b/src/storage/memtable/ob_memtable_iterator.cpp @@ -21,6 +21,7 @@ #include "storage/memtable/ob_memtable_data.h" #include "storage/memtable/mvcc/ob_mvcc_engine.h" #include "storage/memtable/mvcc/ob_mvcc_row.h" +#include "storage/memtable/ob_row_conflict_handler.h" #include "storage/tx/ob_trans_define.h" #include "ob_memtable_context.h" #include "ob_memtable.h" @@ -347,11 +348,26 @@ int ObMemtableScanIterator::inner_get_next_row(const ObDatumRow *&row) key->get_rowkey(rowkey); bool is_committed = false; - if (OB_NOT_NULL(value_iter) && OB_NOT_NULL(value_iter->get_trans_node()) - && value_iter->get_trans_node()->is_committed()) { + const ObMvccTransNode *latest_trans_node = value_iter->get_trans_node(); + if (OB_NOT_NULL(latest_trans_node) + && latest_trans_node->is_committed()) { is_committed = true; } - if (OB_FAIL(ObReadRow::iterate_row(*read_info_, *rowkey, *(context_->allocator_), *value_iter, row_, bitmap_, row_scn))) { + + ObStoreRowLockState lock_state; + if (param_->is_for_foreign_check_ && + OB_FAIL(ObRowConflictHandler::check_foreign_key_constraint_for_memtable(value_iter, lock_state))) { + if (OB_TRY_LOCK_ROW_CONFLICT == ret) { + ObRowConflictHandler::post_row_read_conflict( + *value_iter->get_mvcc_acc_ctx(), + *rowkey, + lock_state, + context_->tablet_id_, + context_->ls_id_, + value_iter->get_mvcc_row()->get_last_compact_cnt(), + value_iter->get_mvcc_row()->get_total_trans_node_cnt()); + } + } else if (OB_FAIL(ObReadRow::iterate_row(*read_info_, *rowkey, *(context_->allocator_), *value_iter, row_, bitmap_, row_scn))) { TRANS_LOG(WARN, "iterate_row fail", K(ret), K(*rowkey), KP(value_iter)); } else { STORAGE_LOG(DEBUG, "chaser debug memtable next row", K(row_)); @@ -1419,4 +1435,3 @@ int ObReadRow::iterate_row( } } - diff --git a/src/storage/memtable/ob_row_conflict_handler.cpp b/src/storage/memtable/ob_row_conflict_handler.cpp new file mode 100644 index 0000000000..5d63947316 --- /dev/null +++ b/src/storage/memtable/ob_row_conflict_handler.cpp @@ -0,0 +1,171 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ +#define USING_LOG_PREFIX TRANS +#include "ob_row_conflict_handler.h" +#include "storage/memtable/mvcc/ob_mvcc_iterator.h" +#include "storage/memtable/ob_lock_wait_mgr.h" + +namespace oceanbase { +using namespace common; +using namespace memtable; +using namespace transaction; +namespace storage { +int ObRowConflictHandler::check_foreign_key_constraint_for_memtable(ObMvccValueIterator *value_iter, + ObStoreRowLockState &lock_state) +{ + int ret = OB_SUCCESS; + if (OB_ISNULL(value_iter)) { + ret = OB_BAD_NULL_ERROR; + TRANS_LOG(ERROR, "the ObMvccValueIterator is null", K(ret)); + } else if (OB_FAIL(value_iter->check_row_locked(lock_state))) { + TRANS_LOG(WARN, "check row locked fail", K(ret), K(lock_state)); + } else { + const ObMvccAccessCtx *ctx = value_iter->get_mvcc_acc_ctx(); + const ObTransID my_tx_id = ctx->get_tx_id(); + const share::SCN snapshot_version = ctx->get_snapshot_version(); + if (lock_state.is_locked_ && my_tx_id != lock_state.lock_trans_id_) { + ret = OB_TRY_LOCK_ROW_CONFLICT; + if (REACH_TIME_INTERVAL(1000 * 1000)) { + TRANS_LOG(WARN, "meet lock conflict on memtable", K(ret), K(lock_state.lock_trans_id_), K(my_tx_id)); + } + } else if (!lock_state.is_locked_ && lock_state.trans_version_ > snapshot_version) { + ret = OB_TRANSACTION_SET_VIOLATION; + if (REACH_TIME_INTERVAL(1000 * 1000)) { + TRANS_LOG(WARN, "meet tsc on memtable", K(ret), K(lock_state.trans_version_), K(snapshot_version)); + } + } + } + return ret; +} + +int ObRowConflictHandler::check_foreign_key_constraint_for_sstable(const ObTxTableGuard &tx_table_guard, + const ObTransID &read_trans_id, + const ObTransID &data_trans_id, + const int64_t sql_sequence, + const int64_t trans_version, + const int64_t snapshot_version, + ObStoreRowLockState &lock_state) { + int ret = OB_SUCCESS; + // If a transaction is committed, the trans_id of it is 0, which is invalid. + // So we can not use check_row_locekd interface to get the trans_version. + if (!data_trans_id.is_valid()) { + if (trans_version > snapshot_version) { + ret = OB_TRANSACTION_SET_VIOLATION; + TRANS_LOG(WARN, "meet tsc on sstable", K(ret), K(lock_state.trans_version_), K(snapshot_version)); + } + } else { + ObTxTable *tx_table = nullptr; + int64_t read_epoch = ObTxTable::INVALID_READ_EPOCH; + if (!tx_table_guard.is_valid()) { + ret = OB_ERR_UNEXPECTED; + TRANS_LOG(ERROR, "tx table guard is invalid", KR(ret)); + } else if (FALSE_IT(tx_table = tx_table_guard.get_tx_table())) { + } else if (FALSE_IT(read_epoch = tx_table_guard.epoch())) { + } else if (OB_FAIL(tx_table->check_row_locked( + read_trans_id, data_trans_id, sql_sequence, read_epoch, lock_state))){ + TRANS_LOG(WARN, "check row locked fail", K(ret), K(read_trans_id), K(data_trans_id), K(sql_sequence), K(read_epoch), K(lock_state)); + } + if (lock_state.is_locked_ && read_trans_id != lock_state.lock_trans_id_) { + ret = OB_TRY_LOCK_ROW_CONFLICT; + if (REACH_TIME_INTERVAL(1000 * 1000)) { + TRANS_LOG(WARN, "meet lock conflict on sstable", K(ret), K(lock_state.lock_trans_id_), K(read_trans_id)); + } + } else if (!lock_state.is_locked_ && lock_state.trans_version_.get_val_for_tx() > snapshot_version) { + ret = OB_TRANSACTION_SET_VIOLATION; + if (REACH_TIME_INTERVAL(1000 * 1000)) { + TRANS_LOG(WARN, "meet tsc on sstable", K(ret), K(lock_state.trans_version_), K(snapshot_version)); + } + } + } + return ret; +} + +int ObRowConflictHandler::post_row_read_conflict(ObMvccAccessCtx &acc_ctx, + const ObStoreRowkey &row_key, + ObStoreRowLockState &lock_state, + const ObTabletID tablet_id, + const share::ObLSID ls_id, + const int64_t last_compact_cnt, + const int64_t total_trans_node_cnt) +{ + int ret = OB_TRY_LOCK_ROW_CONFLICT; + ObLockWaitMgr *lock_wait_mgr = NULL; + ObTransID conflict_tx_id = lock_state.lock_trans_id_; + // auto mem_ctx = acc_ctx.get_mem_ctx(); + ObTxDesc *tx_desc = acc_ctx.get_tx_desc(); + int64_t current_ts = common::ObClockGenerator::getClock(); + int64_t lock_wait_start_ts = acc_ctx.get_lock_wait_start_ts() > 0 + ? acc_ctx.get_lock_wait_start_ts() + : current_ts; + int64_t lock_wait_expire_ts = acc_ctx.eval_lock_expire_ts(lock_wait_start_ts); + if (current_ts >= lock_wait_expire_ts) { + ret = OB_ERR_EXCLUSIVE_LOCK_CONFLICT; + TRANS_LOG(WARN, "exclusive lock conflict", K(ret), K(row_key), + K(conflict_tx_id), K(acc_ctx), K(lock_wait_expire_ts)); + } else if (OB_ISNULL(lock_wait_mgr = MTL_WITH_CHECK_TENANT(ObLockWaitMgr*, + tx_desc->get_tenant_id()))) { + TRANS_LOG(WARN, "can not get tenant lock_wait_mgr MTL", K(tx_desc->get_tenant_id())); + } else { + int tmp_ret = OB_SUCCESS; + ObTransID tx_id = acc_ctx.get_tx_id(); + ObAddr scheduler_addr; + if (OB_FAIL(ObTransDeadlockDetectorAdapter:: + get_trans_scheduler_info_on_participant(conflict_tx_id, ls_id, scheduler_addr))) { + TRANS_LOG(WARN, "get transaction scheduler info fail", K(ret), K(conflict_tx_id), K(tx_id), K(ls_id)); + } + ObTransIDAndAddr conflict_tx(conflict_tx_id, scheduler_addr); + tx_desc->add_conflict_tx(conflict_tx); + bool remote_tx = scheduler_addr != GCTX.self_addr(); + ObFunction recheck_func([&](bool &locked, bool &wait_on_row) -> int { + int ret = OB_SUCCESS; + lock_state.is_locked_ = false; + if (lock_state.is_delayed_cleanout_) { + auto lock_data_sequence = lock_state.lock_data_sequence_; + auto &tx_table_guard = acc_ctx.get_tx_table_guard(); + int64_t read_epoch = tx_table_guard.epoch(); + if (OB_FAIL(tx_table_guard.get_tx_table()->check_row_locked( + tx_id, conflict_tx_id, lock_data_sequence, read_epoch, lock_state))) { + TRANS_LOG(WARN, "re-check row locked via tx_table fail", K(ret), K(tx_id), K(lock_state)); + } + } else { + if (OB_FAIL(lock_state.mvcc_row_->check_row_locked(acc_ctx, lock_state))) { + TRANS_LOG(WARN, "re-check row locked via mvcc_row fail", K(ret), K(tx_id), K(lock_state)); + } + } + if (OB_SUCC(ret)) { + locked = lock_state.is_locked_ && lock_state.lock_trans_id_ != tx_id; + wait_on_row = !lock_state.is_delayed_cleanout_; + } + return ret; + }); + tmp_ret = lock_wait_mgr->post_lock(OB_TRY_LOCK_ROW_CONFLICT, + tablet_id, + row_key, + lock_wait_expire_ts, + remote_tx, + last_compact_cnt, + total_trans_node_cnt, + tx_id, + conflict_tx_id, + recheck_func); + if (OB_SUCCESS != tmp_ret) { + TRANS_LOG(WARN, "post_lock after tx conflict failed", + K(tmp_ret), K(tx_id), K(conflict_tx_id)); + } else if (acc_ctx.get_lock_wait_start_ts() <= 0) { + acc_ctx.set_lock_wait_start_ts(lock_wait_start_ts); + } + } + return ret; +} + +} +} diff --git a/src/storage/memtable/ob_row_conflict_handler.h b/src/storage/memtable/ob_row_conflict_handler.h new file mode 100644 index 0000000000..588318f7b3 --- /dev/null +++ b/src/storage/memtable/ob_row_conflict_handler.h @@ -0,0 +1,70 @@ +/** + * Copyright (c) 2021 OceanBase + * OceanBase CE is licensed under Mulan PubL v2. + * You can use this software according to the terms and conditions of the Mulan PubL v2. + * You may obtain a copy of Mulan PubL v2 at: + * http://license.coscl.org.cn/MulanPubL-2.0 + * THIS SOFTWARE IS PROVIDED ON AN "AS IS" BASIS, WITHOUT WARRANTIES OF ANY KIND, + * EITHER EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO NON-INFRINGEMENT, + * MERCHANTABILITY OR FIT FOR A PARTICULAR PURPOSE. + * See the Mulan PubL v2 for more details. + */ + +#ifndef OB_ROW_CONFLICT_HANDLER_H_ +#define OB_ROW_CONFLICT_HANDLER_H_ + +#include +namespace oceanbase { +namespace memtable { +class ObMvccAccessCtx; +class ObMvccValueIterator; +} +namespace transaction { +class ObTransID; +} +namespace common { +class ObTabletID; +class ObStoreRowkey; +} +namespace share { +class ObLSID; +} +namespace storage { +class ObStoreRowLockState; +class ObTxTableGuard; + +class ObRowConflictHandler { +public: + // There are 2 cases that can lead to row conflict in foreign key constraint check: + // Case 1: the row is locked, mainly beacuse there's an uncommitted transaction on it. + // If the check meet this case, we should call post_row_read_conflict to put it into + // lock_wait_mgr and register deadlock decetion; + // + // Case 2: the row is committed, but the trans_version on it is larger than the + // snapshot_version of current transaction, it will cause tsc. + // If the check meet this case, we should return error code to sql layer, and it will + // choose to retry or throw an exception according to the isolation level. + static int check_foreign_key_constraint_for_memtable(memtable::ObMvccValueIterator *value_iter, + storage::ObStoreRowLockState &lock_state); + static int check_foreign_key_constraint_for_sstable(const storage::ObTxTableGuard &tx_table_guard, + const transaction::ObTransID &read_trans_id, + const transaction::ObTransID &data_trans_id, + const int64_t sql_sequence, + const int64_t trans_version, + const int64_t snapshot_version, + storage::ObStoreRowLockState &lock_state); + // TODO(yichang): This function is refered to ObMemtable::post_row_write_conflict_, + // but remove the mem_ctx and tx_ctx in the implement. I think ObMemtable can call + // this function, too. But it seems there's still a need to use mem_ctx to record + // some statistics. Maybe we can move these statistics to tx_desc then. + static int post_row_read_conflict(memtable::ObMvccAccessCtx &acc_ctx, + const common::ObStoreRowkey &row_key, + storage::ObStoreRowLockState &lock_state, + const common::ObTabletID tablet_id, + const share::ObLSID ls_id, + const int64_t last_compact_cnt, + const int64_t total_trans_node_cnt); +}; +} +} +#endif diff --git a/src/storage/tx/ob_trans_define_v4.cpp b/src/storage/tx/ob_trans_define_v4.cpp index c695d44768..faeabe70b3 100644 --- a/src/storage/tx/ob_trans_define_v4.cpp +++ b/src/storage/tx/ob_trans_define_v4.cpp @@ -779,6 +779,25 @@ void ObTxDesc::release_implicit_savepoint(const int64_t savepoint) } } +int ObTxDesc::add_conflict_tx(const ObTransIDAndAddr conflict_tx) { + ObSpinLockGuard guard(lock_); + return add_conflict_tx_(conflict_tx); +} + +int ObTxDesc::add_conflict_tx_(const ObTransIDAndAddr &conflict_tx) { + int ret = OB_SUCCESS; + if (cflict_txs_.count() >= MAX_RESERVED_CONFLICT_TX_NUM) { + ret = OB_SIZE_OVERFLOW; + int64_t max_reserved_conflict_tx_num = MAX_RESERVED_CONFLICT_TX_NUM; + DETECT_LOG(WARN, "too many conflict trans id", K(max_reserved_conflict_tx_num), K(cflict_txs_), K(conflict_tx)); + } else if (!is_contain(cflict_txs_, conflict_tx)) { + if (OB_FAIL(cflict_txs_.push_back(conflict_tx))) { + DETECT_LOG(WARN, "fail to push conflict tx to cflict_txs_", K(ret), K(cflict_txs_), K(conflict_tx)); + } + } + return ret; +} + int ObTxDesc::merge_conflict_txs(const ObIArray &conflict_txs) { ObSpinLockGuard guard(lock_); @@ -788,15 +807,14 @@ int ObTxDesc::merge_conflict_txs(const ObIArray &conflict_txs) int ObTxDesc::merge_conflict_txs_(const ObIArray &conflict_txs) { int ret = OB_SUCCESS; - for (int64_t idx = 0; idx < conflict_txs.count() && OB_SUCC(ret); ++idx) { - if (cflict_txs_.count() > MAX_RESERVED_CONFLICT_TX_NUM) { - int64_t max_reserved_conflict_tx_num = MAX_RESERVED_CONFLICT_TX_NUM; - DETECT_LOG(WARN, "too many conflict trans id", K(max_reserved_conflict_tx_num), K(cflict_txs_), K(conflict_txs)); - break; - } else if (is_contain(cflict_txs_, conflict_txs.at(idx))) { - continue; - } else if (OB_FAIL(cflict_txs_.push_back(conflict_txs.at(idx)))) { - DETECT_LOG(WARN, "fail to push conflict id to cflict_txs_", K(cflict_txs_), K(conflict_txs)); + int tmp_ret = OB_SUCCESS; + for (int64_t idx = 0; idx < conflict_txs.count() && OB_SUCC(tmp_ret); ++idx) { + // This function should try its best to push the conflict_tx into the array. + // However, whether the insertion is successful or not + // should not affect the normal execution process. + // So we just use tmp_ret to catch the error code here. + if (OB_TMP_FAIL(add_conflict_tx_(conflict_txs.at(idx)))) { + DETECT_LOG(WARN, "fail to add conflict tx to cflict_txs_", K(tmp_ret), K(cflict_txs_), K(conflict_txs.at(idx))); } } return ret; diff --git a/src/storage/tx/ob_trans_define_v4.h b/src/storage/tx/ob_trans_define_v4.h index 7313bea22d..fb80bf2f97 100644 --- a/src/storage/tx/ob_trans_define_v4.h +++ b/src/storage/tx/ob_trans_define_v4.h @@ -467,6 +467,7 @@ private: void execute_commit_cb(); private: int update_part_(ObTxPart &p, bool append = true); + int add_conflict_tx_(const ObTransIDAndAddr &conflict_tx); int merge_conflict_txs_(const ObIArray &conflict_ids); int update_parts_(const ObTxPartList &list); void implicit_start_tx_(); @@ -522,6 +523,7 @@ public: { ObSpinLockGuard guard(lock_); return array.assign(cflict_txs_); } void reset_conflict_txs() { ObSpinLockGuard guard(lock_); cflict_txs_.reset(); } + int add_conflict_tx(const ObTransIDAndAddr conflict_tx); int merge_conflict_txs(const ObIArray &conflict_ids); bool contain(const ObTransID &trans_id) const { return tx_id_ == trans_id; } /*used by TransHashMap*/ uint64_t get_tenant_id() const { return tenant_id_; } diff --git a/src/storage/tx_storage/ob_access_service.cpp b/src/storage/tx_storage/ob_access_service.cpp index 87a623dcad..22128b767f 100644 --- a/src/storage/tx_storage/ob_access_service.cpp +++ b/src/storage/tx_storage/ob_access_service.cpp @@ -222,6 +222,10 @@ int ObAccessService::table_scan( ObLS *ls = nullptr; ObLSTabletService *tablet_service = nullptr; ObTableScanParam ¶m = static_cast(vparam); + // TODO(yichang): maybe we need to move this scan_flag_ setting to sql layer? + if (param.is_for_foreign_check_) { + param.scan_flag_.set_iter_uncommitted_row(); + } ObStoreAccessType access_type = param.scan_flag_.is_read_latest() ? ObStoreAccessType::READ_LATEST : ObStoreAccessType::READ; SCN user_specified_snapshot_scn; @@ -427,6 +431,22 @@ int ObAccessService::check_read_allowed_( } } } + // If this select is for foreign key check, + // we should get tx_id and tx_desc for deadlock detection. + if (scan_param.is_for_foreign_check_) { + if (scan_param.tx_id_.is_valid()) { + ctx.mvcc_acc_ctx_.tx_id_ = scan_param.tx_id_; + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("foreign key check need scan_param.tx_id_ valid", K(ret), K(scan_param.tx_id_)); + } + if (OB_NOT_NULL(scan_param.trans_desc_) && scan_param.trans_desc_->is_valid()) { + ctx.mvcc_acc_ctx_.tx_desc_ = scan_param.trans_desc_; + } else { + ret = OB_INVALID_ARGUMENT; + LOG_WARN("foreign key check need scan_param.trans_desc_ valid", K(ret), KPC(scan_param.trans_desc_)); + } + } } return ret; }