[FOREIGN KEY] fix the bug about meet invalid conflict during checking child table which without index

This commit is contained in:
YangEfei 2023-09-14 23:47:53 +00:00 committed by ob-robot
parent 098c6384d4
commit f00019cb7c
19 changed files with 307 additions and 184 deletions

View File

@ -54,8 +54,10 @@ public:
jump_read_group_id_(-1),
flags_(0)
{
is_fk_cascading_ = 0;
need_check_server_ = 1;
same_server_ = 1;
iter_uncommitted_row_ = 0;
}
~ObDASCtx()
{
@ -139,10 +141,10 @@ public:
uint64_t is_fk_cascading_ : 1; //fk starts to trigger nested sql
uint64_t need_check_server_ : 1; //need to check if partitions hit the same server
uint64_t same_server_ : 1; //if partitions hit the same server, could be local or remote
uint64_t reserved_ : 62;
uint64_t iter_uncommitted_row_ : 1; //iter uncommitted row in fk_checker
uint64_t reserved_ : 60;
};
};
};
} // namespace sql
} // namespace oceanbase

View File

@ -44,7 +44,8 @@ OB_DEF_SERIALIZE(ObDASDMLBaseRtDef)
timeout_ts_,
sql_mode_,
prelock_,
tenant_schema_version_);
tenant_schema_version_,
is_for_foreign_key_check_);
return ret;
}
@ -55,7 +56,8 @@ OB_DEF_DESERIALIZE(ObDASDMLBaseRtDef)
timeout_ts_,
sql_mode_,
prelock_,
tenant_schema_version_);
tenant_schema_version_,
is_for_foreign_key_check_);
if (OB_SUCC(ret)) {
(void)ObSQLUtils::adjust_time_by_ntp_offset(timeout_ts_);
}
@ -69,7 +71,8 @@ OB_DEF_SERIALIZE_SIZE(ObDASDMLBaseRtDef)
timeout_ts_,
sql_mode_,
prelock_,
tenant_schema_version_);
tenant_schema_version_,
is_for_foreign_key_check_);
return len;
}

View File

@ -108,11 +108,13 @@ public:
K_(sql_mode),
K_(prelock),
K_(tenant_schema_version),
K_(is_for_foreign_key_check),
K_(affected_rows));
int64_t timeout_ts_;
ObSQLMode sql_mode_;
bool prelock_;
int64_t tenant_schema_version_;
bool is_for_foreign_key_check_;
int64_t affected_rows_;
const DASDMLCtDefArray *related_ctdefs_;
DASDMLRtDefArray *related_rtdefs_;
@ -123,6 +125,7 @@ protected:
sql_mode_(DEFAULT_OCEANBASE_MODE),
prelock_(false),
tenant_schema_version_(0),
is_for_foreign_key_check_(false),
affected_rows_(0),
related_ctdefs_(nullptr),
related_rtdefs_(nullptr)

View File

@ -1156,6 +1156,9 @@ int ObDMLService::init_dml_param(const ObDASDMLBaseCtDef &base_ctdef,
&& !dml_param.table_param_->get_data_table().can_read_index()) {
dml_param.write_flag_.set_is_write_only_index();
}
if (base_rtdef.is_for_foreign_key_check_) {
dml_param.write_flag_.set_check_row_locked();
}
return ret;
}
@ -1185,7 +1188,9 @@ int ObDMLService::init_das_dml_rtdef(ObDMLRtCtx &dml_rtctx,
}
}
}
if (ObSQLUtils::is_fk_nested_sql(&dml_rtctx.get_exec_ctx())) {
das_rtdef.is_for_foreign_key_check_ = true;
}
return ret;
}

View File

@ -87,6 +87,8 @@ int ForeignKeyHandle::do_handle(ObTableModifyOp &op,
LOG_WARN("is_self_ref_row failed", K(ret), K(old_row), K(fk_arg));
} else if (new_row.empty() && is_self_ref && op.is_fk_nested_session()) {
// delete self refercnced row should not cascade delete.
} else if (OB_FAIL(check_exist_inner_sql(op, fk_arg, old_row, true, true))) {
LOG_WARN("check exist before cascade failed", K(ret), K(fk_arg), K(old_row));
} else if (OB_FAIL(cascade(op, fk_arg, old_row, new_row))) {
LOG_WARN("failed to cascade", K(ret), K(fk_arg), K(old_row), K(new_row));
} else if (!new_row.empty() && is_self_ref) {
@ -114,7 +116,9 @@ int ForeignKeyHandle::do_handle(ObTableModifyOp &op,
}
}
} else if (ACTION_SET_NULL == fk_arg.ref_action_) {
if (OB_FAIL(set_null(op, fk_arg, old_row))) {
if (OB_FAIL(check_exist_inner_sql(op, fk_arg, old_row, true, true))) {
LOG_WARN("check exist before cascade failed", K(ret), K(fk_arg), K(old_row));
} else if (OB_FAIL(set_null(op, fk_arg, old_row))) {
LOG_WARN("failed to perform set null for foreign key", K(ret));
}
}
@ -159,20 +163,24 @@ int ForeignKeyHandle::check_exist(ObTableModifyOp &modify_op, const ObForeignKey
const ObExprPtrIArray &row, ObForeignKeyChecker *fk_checker, bool expect_zero)
{
int ret = OB_SUCCESS;
DEBUG_SYNC(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK);
if (!expect_zero) {
ret = check_exist_scan_task(modify_op, fk_arg, row, fk_checker, expect_zero);
ret = check_exist_scan_task(modify_op, fk_arg, row, fk_checker);
} else {
ret = check_exist_inner_sql(modify_op, fk_arg, row, expect_zero);
if (OB_FAIL(check_exist_inner_sql(modify_op, fk_arg, row, expect_zero, true))) {
LOG_WARN("check exist and iter uncommmited row meet failed", K(ret));
} else if (OB_FAIL(check_exist_inner_sql(modify_op, fk_arg, row, expect_zero, false))) {
LOG_WARN("check exist and iter commmited row meet failed", K(ret));
}
}
return ret;
}
int ForeignKeyHandle::check_exist_scan_task(ObTableModifyOp &modify_op, const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row, ObForeignKeyChecker *fk_checker, bool expect_zero)
const ObExprPtrIArray &row, ObForeignKeyChecker *fk_checker)
{
int ret = OB_SUCCESS;
bool has_result = false;
DEBUG_SYNC(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK);
if (OB_ISNULL(fk_checker)) {
ret = OB_ERR_UNEXPECTED;
LOG_WARN("foreign key checker is nullptr", K(ret));
@ -189,16 +197,16 @@ int ForeignKeyHandle::check_exist_scan_task(ObTableModifyOp &modify_op, const Ob
}
int ForeignKeyHandle::check_exist_inner_sql(ObTableModifyOp &op,
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row,
bool expect_zero)
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row,
bool expect_zero,
bool iter_uncommitted_row)
{
DEBUG_SYNC(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK);
int ret = OB_SUCCESS;
static const char *SELECT_FMT_MYSQL =
"select /*+ no_parallel */ 1 from `%.*s`.`%.*s` where %.*s limit 2";
"select /*+ no_parallel */ 1 from `%.*s`.`%.*s` where %.*s limit 2 for update";
static const char *SELECT_FMT_ORACLE =
"select /*+ no_parallel */ 1 from \"%.*s\".\"%.*s\" where %.*s and rownum <= 2";
"select /*+ no_parallel */ 1 from \"%.*s\".\"%.*s\" where %.*s and rownum <= 2 for update";
const char *select_fmt = lib::is_mysql_mode() ? SELECT_FMT_MYSQL : SELECT_FMT_ORACLE;
ObArenaAllocator alloc(ObModIds::OB_MODULE_PAGE_ALLOCATOR,
OB_MALLOC_NORMAL_BLOCK_SIZE,
@ -232,6 +240,9 @@ int ForeignKeyHandle::check_exist_inner_sql(ObTableModifyOp &op,
stmt_buf[stmt_pos++] = 0;
}
if (OB_SUCC(ret) && stmt_pos > 0) {
if (iter_uncommitted_row) {
op.get_exec_ctx().get_das_ctx().iter_uncommitted_row_ = true;
}
LOG_DEBUG("foreign_key_check_exist", "stmt", stmt_buf, K(row), K(fk_arg));
SMART_VAR(ObMySQLProxy::MySQLResult, res) {
if (OB_FAIL(op.begin_nested_session(fk_arg.is_self_ref_))) {
@ -282,14 +293,16 @@ int ForeignKeyHandle::check_exist_inner_sql(ObTableModifyOp &op,
* is true, then is_zero is false, need to exclude the case of self reference and
* only affect one row. other cases return OB_ERR_ROW_IS_REFERENCED.
*/
if (OB_FAIL(is_self_ref_row(op.get_eval_ctx(), row, fk_arg, is_self_ref))) {
LOG_WARN("is_self_ref_row failed", K(ret), K(row), K(fk_arg));
} else if (is_zero && !is_self_ref) {
ret = OB_ERR_NO_REFERENCED_ROW;
LOG_WARN("parent row is not exist", K(ret), K(fk_arg), K(row));
} else if (!is_zero) {
ret = OB_ERR_ROW_IS_REFERENCED;
LOG_WARN("child row is exist", K(ret), K(fk_arg), K(row));
if (!iter_uncommitted_row) {
if (OB_FAIL(is_self_ref_row(op.get_eval_ctx(), row, fk_arg, is_self_ref))) {
LOG_WARN("is_self_ref_row failed", K(ret), K(row), K(fk_arg));
} else if (is_zero && !is_self_ref) {
ret = OB_ERR_NO_REFERENCED_ROW;
LOG_WARN("parent row is not exist", K(ret), K(fk_arg), K(row));
} else if (!is_zero) {
ret = OB_ERR_ROW_IS_REFERENCED;
LOG_WARN("child row is exist", K(ret), K(fk_arg), K(row));
}
}
}
}
@ -318,6 +331,8 @@ int ForeignKeyHandle::check_exist_inner_sql(ObTableModifyOp &op,
}
}
}
op.get_exec_ctx().get_das_ctx().iter_uncommitted_row_ = false;
return ret;
}
@ -1293,6 +1308,7 @@ int ObTableModifyOp::inner_get_next_row()
int ObTableModifyOp::perform_batch_fk_check()
{
DEBUG_SYNC(BEFORE_FOREIGN_KEY_CONSTRAINT_CHECK);
int ret = OB_SUCCESS;
for (int64_t i = 0; OB_SUCC(ret) && i < fk_checkers_.count(); ++i) {
bool all_has_result = false;

View File

@ -51,12 +51,12 @@ private:
static int check_exist_inner_sql(ObTableModifyOp &modify_op,
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row,
bool expect_zero);
bool expect_zero,
bool iter_uncommitted_row);
static int check_exist_scan_task(ObTableModifyOp &modify_op,
const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &row,
ObForeignKeyChecker *fk_checker,
bool expect_zero);
ObForeignKeyChecker *fk_checker);
static int cascade(ObTableModifyOp &modify_op, const ObForeignKeyArg &fk_arg,
const ObExprPtrIArray &old_row, const ObExprPtrIArray &new_row);

View File

@ -890,7 +890,9 @@ OB_INLINE int ObTableScanOp::init_das_scan_rtdef(const ObDASScanCtDef &das_ctdef
das_rtdef.scan_flag_.is_show_seed_ = plan_ctx->get_show_seed();
if(is_foreign_check_nested_session()) {
das_rtdef.is_for_foreign_check_ = true;
das_rtdef.scan_flag_.set_for_foreign_key_check();
if (plan_ctx->get_phy_plan()->has_for_update() && ObSQLUtils::is_iter_uncommitted_row(&ctx_)) {
das_rtdef.scan_flag_.set_iter_uncommitted_row();
}
}
if (MY_SPEC.batch_scan_flag_ || is_lookup) {
das_rtdef.scan_flag_.scan_order_ = ObQueryFlag::KeepOrder;

View File

@ -4437,6 +4437,17 @@ bool ObSQLUtils::is_fk_nested_sql(ObExecContext *cur_ctx)
return bret;
}
bool ObSQLUtils::is_iter_uncommitted_row(ObExecContext *cur_ctx)
{
bool bret = false;
if (cur_ctx != nullptr &&
cur_ctx->get_parent_ctx() != nullptr &&
cur_ctx->get_parent_ctx()->get_das_ctx().iter_uncommitted_row_) {
bret = true;
}
return bret;
}
//notice: if a SQL is triggered by a PL defined as an autonomous transaction,
//then it is not nested sql, nor is it restricted by the constraints of nested sql
bool ObSQLUtils::is_nested_sql(ObExecContext *cur_ctx)

View File

@ -577,6 +577,7 @@ public:
static bool is_support_batch_exec(ObItemType type);
static bool is_pl_nested_sql(ObExecContext *cur_ctx);
static bool is_fk_nested_sql(ObExecContext *cur_ctx);
static bool is_iter_uncommitted_row(ObExecContext *cur_ctx);
static bool is_nested_sql(ObExecContext *cur_ctx);
static bool is_select_from_dual(ObExecContext &ctx);

View File

@ -3486,6 +3486,9 @@ int ObLSTabletService::need_check_old_row_legitimacy(ObDMLRunningCtx &run_ctx,
//index can not be read during building index, so does not check old index row
need_check = false;
}
if (ObDmlFlag::DF_LOCK == run_ctx.dml_flag_) {
need_check = false;
}
}
return ret;
}

View File

@ -131,7 +131,7 @@ int ObMvccEngine::get(ObMvccAccessCtx &ctx,
TRANS_LOG(WARN, "fail to try to compact row", K(tmp_ret));
}
} else if (query_flag.is_for_foreign_key_check()) {
ret = ObRowConflictHandler::check_foreign_key_constraint_for_memtable(&ctx, value, lock_state);
ret = ObRowConflictHandler::check_foreign_key_constraint_for_memtable(ctx, value, lock_state);
} else {
// do nothing
}

View File

@ -56,15 +56,13 @@ int ObMvccValueIterator::init(ObMvccAccessCtx &ctx,
is_inited_ = true;
}
}
TRANS_LOG(TRACE, "value_iter.init", K(ret),
KPC(value),
KPC_(version_iter),
K(query_flag.is_read_latest()),
KPC(key),
K(ctx),
K(lbt()));
KPC(value),
KPC_(version_iter),
K(query_flag.is_read_latest()),
KPC(key),
K(ctx),
K(lbt()));
return ret;
}
@ -420,7 +418,7 @@ int ObMvccRowIterator::get_next_row(
TRANS_LOG(ERROR, "unexpected value null pointer", "ctx", *ctx_);
ret = OB_ERR_UNEXPECTED;
} else if (query_flag_.is_for_foreign_key_check()) {
if (OB_FAIL(ObRowConflictHandler::check_foreign_key_constraint_for_memtable(ctx_, value, lock_state))) {
if (OB_FAIL(ObRowConflictHandler::check_foreign_key_constraint_for_memtable(*ctx_, value, lock_state))) {
// we will throw error code if it's failed here, but we need to
// post lock with key outside, so we have to set it here.
key = tmp_key;
@ -498,4 +496,3 @@ int ObMvccRowIterator::try_purge(const ObTxSnapshot &snapshot_info,
} // namespace memtable
} // namespace oceanbase

View File

@ -30,7 +30,8 @@ struct ObWriteFlag
#define OBWF_BIT_DML_BATCH_OPT 1
#define OBWF_BIT_INSERT_UP 1
#define OBWF_BIT_WRITE_ONLY_INDEX 1
#define OBWF_BIT_RESERVED 61
#define OBWF_BIT_CHECK_ROW_LOCKED 1
#define OBWF_BIT_RESERVED 57
static const uint64_t OBWF_MASK_TABLE_API = (0x1UL << OBWF_BIT_TABLE_API) - 1;
static const uint64_t OBWF_MASK_TABLE_LOCK = (0x1UL << OBWF_BIT_TABLE_LOCK) - 1;
@ -38,6 +39,7 @@ struct ObWriteFlag
static const uint64_t OBWF_MASK_DML_BATCH_OPT = (0x1UL << OBWF_BIT_DML_BATCH_OPT) - 1;
static const uint64_t OBWF_MASK_INSERT_UP = (0x1UL << OBWF_BIT_INSERT_UP) - 1;
static const uint64_t OBWF_MASK_WRITE_ONLY_INDEX = (0x1UL << OBWF_BIT_WRITE_ONLY_INDEX) - 1;
static const uint64_t OBWF_MASK_CHECK_ROW_LOCKED = (0x1UL << OBWF_BIT_CHECK_ROW_LOCKED) - 1;
union
{
@ -50,6 +52,7 @@ struct ObWriteFlag
uint64_t is_dml_batch_opt_ : OBWF_BIT_DML_BATCH_OPT; // 0: false(default), 1: true
uint64_t is_insert_up_ : OBWF_BIT_INSERT_UP; // 0: false(default), 1: true
uint64_t is_write_only_index_ : OBWF_BIT_WRITE_ONLY_INDEX; // 0: false(default), 1: true
uint64_t is_check_row_locked_ : OBWF_BIT_CHECK_ROW_LOCKED; // 0: false(default), 1: true
uint64_t reserved_ : OBWF_BIT_RESERVED;
};
};
@ -68,13 +71,16 @@ struct ObWriteFlag
inline void set_is_insert_up() { is_insert_up_ = true; }
inline bool is_write_only_index() const { return is_write_only_index_; }
inline void set_is_write_only_index() { is_write_only_index_ = true; }
inline bool is_check_row_locked() const { return is_check_row_locked_; }
inline void set_check_row_locked() { is_check_row_locked_ = true; }
TO_STRING_KV("is_table_api", is_table_api_,
"is_table_lock", is_table_lock_,
"is_mds", is_mds_,
"is_dml_batch_opt", is_dml_batch_opt_,
"is_insert_up", is_insert_up_,
"is_write_only_index", is_write_only_index_);
"is_write_only_index", is_write_only_index_,
"is_check_row_locked", is_check_row_locked_);
OB_UNIS_VERSION(1);
};

View File

@ -464,94 +464,6 @@ int ObMemtable::lock(
return ret;
}
int ObMemtable::check_row_locked_by_myself(
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey,
bool &locked)
{
int ret = OB_SUCCESS;
ObMemtableKey mtk;
ObMvccWriteGuard guard;
ObStoreCtx *ctx = context.store_ctx_;
if (IS_NOT_INIT) {
TRANS_LOG(WARN, "not init", K(*this));
ret = OB_NOT_INIT;
} else if (!ctx->mvcc_acc_ctx_.is_write() || !rowkey.is_memtable_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid param", K(ret), KP(ctx), K(rowkey));
} else if (OB_ISNULL(ctx->table_iter_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tables handle or iterator in context is null", K(ret), K(ctx));
} else if (OB_FAIL(guard.write_auth(*ctx))) {
TRANS_LOG(WARN, "not allow to write", KP(ctx));
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey.get_store_rowkey()))) {
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
} else {
bool is_locked = false;
bool tmp_is_locked = false;
ObStoreRowLockState lock_state;
const ObIArray<ObITable *> *stores = nullptr;
common::ObSEArray<ObITable *, 4> iter_tables;
ctx->table_iter_->resume();
ObTransID my_tx_id = ctx->mvcc_acc_ctx_.get_tx_id();
while (OB_SUCC(ret)) {
ObITable *table_ptr = nullptr;
if (OB_FAIL(ctx->table_iter_->get_next(table_ptr))) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "failed to get next tables", K(ret));
}
} else if (OB_ISNULL(table_ptr)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "table must not be null", K(ret), KPC(ctx->table_iter_));
} else if (OB_FAIL(iter_tables.push_back(table_ptr))) {
TRANS_LOG(WARN, "rowkey_exists check::", K(ret), KPC(table_ptr));
}
} // end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) {
stores = &iter_tables;
for (int64_t i = stores->count() - 1; OB_SUCC(ret) && !is_locked && i >= 0; i--) {
lock_state.reset();
if (NULL == stores->at(i)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ObIStore is null", K(ret), K(i));
} else if (stores->at(i)->is_data_memtable()) {
ObMemtable *memtable = static_cast<ObMemtable *>(stores->at(i));
if (OB_FAIL(memtable->get_mvcc_engine().check_row_locked(ctx->mvcc_acc_ctx_,
&mtk,
lock_state))) {
TRANS_LOG(WARN, "mvcc engine check row lock fail", K(ret), K(mtk));
} else if (lock_state.is_locked_ && lock_state.lock_trans_id_ == my_tx_id) {
is_locked = true;
}
} else if (stores->at(i)->is_sstable()) {
ObSSTable *sstable = static_cast<ObSSTable *>(stores->at(i));
if (OB_FAIL(sstable->check_row_locked(param, context, rowkey, lock_state))) {
TRANS_LOG(WARN, "sstable check row lock fail", K(ret), K(rowkey));
} else if (lock_state.is_locked_ && lock_state.lock_trans_id_ == my_tx_id) {
is_locked = true;
}
TRANS_LOG(DEBUG, "check_row_locked meet sstable",
K(ret), K(rowkey), K(*sstable), K(is_locked));
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unknown store type", K(ret));
}
}
}
if (OB_SUCC(ret)) {
locked = is_locked;
}
}
return ret;
}
////////////////////////////////////////////////////////////////////////////////////////////////////
void ObMemtable::get_begin(ObMvccAccessCtx &ctx)
{
@ -1115,13 +1027,12 @@ int ObMemtable::replay_row(ObStoreCtx &ctx,
////////////////////////////////////////////////////////////////////////////////////////////////////
int ObMemtable::lock_row_on_frozen_stores_(
const storage::ObTableIterParam &param,
const ObTxNodeArg &arg,
storage::ObTableAccessContext &context,
const ObMemtableKey *key,
ObMvccRow *value,
ObMvccWriteResult &res)
int ObMemtable::lock_row_on_frozen_stores_(const storage::ObTableIterParam &param,
const ObTxNodeArg &arg,
storage::ObTableAccessContext &context,
const ObMemtableKey *key,
ObMvccRow *value,
ObMvccWriteResult &res)
{
int ret = OB_SUCCESS;
ObStoreRowLockState &lock_state = res.lock_state_;
@ -2732,7 +2643,6 @@ int ObMemtable::set_(
return ret;
}
int ObMemtable::lock_(
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
@ -2751,24 +2661,26 @@ int ObMemtable::lock_(
TRANS_LOG(WARN, "Failed to writer rowkey", K(ret), K(rowkey));
} else {
ObMemtableData mtd(blocksstable::ObDmlFlag::DF_LOCK, len, buf);
ObTxNodeArg arg(&mtd, /*memtable_data*/
NULL, /*old_data*/
timestamp_, /*memstore_version*/
context.store_ctx_->mvcc_acc_ctx_.tx_scn_, /*seq_no*/
rowkey.get_obj_cnt() /*column_cnt*/);
if (OB_FAIL(mvcc_write_(param, context, &mtk, arg, is_new_locked))) {
} else if (OB_UNLIKELY(!is_new_locked)) {
TRANS_LOG(DEBUG, "lock twice, no need to store lock trans node");
}
ObTxNodeArg arg(&mtd, /*memtable_data*/
NULL, /*old_data*/
timestamp_, /*memstore_version*/
context.store_ctx_->mvcc_acc_ctx_.tx_scn_, /*seq_no*/
rowkey.get_obj_cnt()); /*column_cnt*/
if (context.store_ctx_->mvcc_acc_ctx_.write_flag_.is_check_row_locked()) {
if (OB_FAIL(ObRowConflictHandler::check_foreign_key_constraint(param, context, rowkey))) {
if (OB_TRY_LOCK_ROW_CONFLICT != ret && OB_TRANSACTION_SET_VIOLATION != ret) {
TRANS_LOG(WARN, "meet unexpected return code in check_row_locked", K(ret), K(context), K(mtk));
}
}
} else if (OB_FAIL(mvcc_write_(param, context, &mtk, arg, is_new_locked))) {
} else if (OB_UNLIKELY(!is_new_locked)) {
TRANS_LOG(DEBUG, "lock twice, no need to store lock trans node");
}
}
return ret;
}
int ObMemtable::mvcc_replay_(storage::ObStoreCtx &ctx,
const ObMemtableKey *key,
const ObTxNodeArg &arg)

View File

@ -290,18 +290,6 @@ public:
virtual int replay_schema_version_change_log(
const int64_t schema_version);
// check_row_locked_by_myself check whether lock is locked by myself
// ctx is the locker tx's context, we need the tx_id for check
// tablet_id is necessary for the query_engine's key engine(NB: do we need it now?)
// columns is the schema of the new_row, it contains the row key
// rowkey is the row key used for lock
// locked returns whether lock is locked by myself
int check_row_locked_by_myself(
const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey,
bool &locked);
// // TODO: ==================== Memtable Other Interface ==================
int set_freezer(storage::ObFreezer *handler);
storage::ObFreezer *get_freezer() { return freezer_; }
@ -509,7 +497,6 @@ private:
int mvcc_replay_(storage::ObStoreCtx &ctx,
const ObMemtableKey *key,
const ObTxNodeArg &arg);
int lock_row_on_frozen_stores_(
const storage::ObTableIterParam &param,
const ObTxNodeArg &arg,
@ -542,8 +529,6 @@ private:
storage::ObTableAccessContext &context,
const common::ObStoreRowkey &rowkey);
int post_row_write_conflict_(ObMvccAccessCtx &acc_ctx,
const ObMemtableKey &row_key,
storage::ObStoreRowLockState &lock_state,

View File

@ -1439,7 +1439,8 @@ OB_INLINE int ObReadRow::iterate_row_value_(
const ObTransID reader_tx_id = value_iter.get_reader_tx_id();
share::SCN row_version = tx_node->trans_version_;
row_scn = row_version.get_val_for_tx();
if (!value_iter.get_mvcc_acc_ctx()->is_standby_read_
if (!row.is_have_uncommited_row() &&
!value_iter.get_mvcc_acc_ctx()->is_standby_read_
&& !(snapshot_tx_id == tx_node->get_tx_id() || reader_tx_id == tx_node->get_tx_id())
&& row_version.is_max()) {
TRANS_LOG(ERROR, "meet row scn with undecided value", KPC(tx_node),

View File

@ -11,6 +11,8 @@
*/
#define USING_LOG_PREFIX TRANS
#include "ob_row_conflict_handler.h"
#include "storage/memtable/ob_memtable.h"
#include "storage/blocksstable/ob_sstable.h"
#include "storage/memtable/mvcc/ob_mvcc_iterator.h"
#include "storage/memtable/ob_lock_wait_mgr.h"
#include "storage/tx_table/ob_tx_table_guards.h"
@ -20,19 +22,166 @@ using namespace common;
using namespace memtable;
using namespace transaction;
namespace storage {
int ObRowConflictHandler::check_foreign_key_constraint_for_memtable(ObMvccAccessCtx *ctx,
ObMvccRow *row,
int ObRowConflictHandler::check_row_locked(const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey,
const bool by_myself,
const bool post_lock)
{
int ret = OB_SUCCESS;
ObStoreRowLockState lock_state;
ObMvccAccessCtx acc_ctx = context.store_ctx_->mvcc_acc_ctx_;
share::SCN max_trans_version = share::SCN::min_scn();
const ObTransID my_tx_id = acc_ctx.get_tx_id();
const share::SCN snapshot_version = acc_ctx.get_snapshot_version();
if (OB_FAIL(check_row_locked(param, context, rowkey, lock_state, max_trans_version))) {
LOG_WARN("check row locked failed", K(ret), K(context), K(rowkey));
} else {
if (lock_state.is_locked_) {
if ((by_myself && lock_state.lock_trans_id_ == my_tx_id)
|| (!by_myself && lock_state.lock_trans_id_ != my_tx_id)) {
ret = OB_TRY_LOCK_ROW_CONFLICT;
if (post_lock) {
post_row_read_conflict(acc_ctx,
rowkey.get_store_rowkey(),
lock_state,
context.tablet_id_,
context.ls_id_,
0,
0, /* these two params get from mvcc_row, and for statistics, so we ignore them */
lock_state.trans_scn_);
}
}
} else if (max_trans_version > snapshot_version) {
ret = OB_TRANSACTION_SET_VIOLATION;
}
}
return ret;
}
int ObRowConflictHandler::check_row_locked(const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey,
ObStoreRowLockState &lock_state,
share::SCN &max_trans_version)
{
int ret = OB_SUCCESS;
ObMemtableKey mtk;
ObMvccWriteGuard guard;
ObStoreCtx *ctx = context.store_ctx_;
if (!ctx->mvcc_acc_ctx_.is_write() || !rowkey.is_memtable_valid()) {
ret = OB_INVALID_ARGUMENT;
TRANS_LOG(WARN, "invalid param", K(ret), KP(ctx), K(rowkey));
} else if (OB_ISNULL(ctx->table_iter_)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "tables handle or iterator in context is null", K(ret), K(ctx));
} else if (OB_FAIL(guard.write_auth(*ctx))) {
TRANS_LOG(WARN, "not allow to write", KP(ctx));
} else if (OB_FAIL(mtk.encode(param.get_read_info()->get_columns_desc(), &rowkey.get_store_rowkey()))) {
TRANS_LOG(WARN, "mtk encode fail", "ret", ret);
} else {
const ObIArray<ObITable *> *stores = nullptr;
common::ObSEArray<ObITable *, 4> iter_tables;
ctx->table_iter_->resume();
while (OB_SUCC(ret)) {
ObITable *table_ptr = nullptr;
if (OB_FAIL(ctx->table_iter_->get_next(table_ptr))) {
if (OB_ITER_END != ret) {
TRANS_LOG(WARN, "failed to get next tables", K(ret));
}
} else if (OB_ISNULL(table_ptr)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "table must not be null", K(ret), KPC(ctx->table_iter_));
} else if (OB_FAIL(iter_tables.push_back(table_ptr))) {
TRANS_LOG(WARN, "rowkey_exists check::", K(ret), KPC(table_ptr));
}
} // end while
if (OB_ITER_END == ret) {
ret = OB_SUCCESS;
}
if (OB_SUCC(ret)) {
share::SCN snapshot_version = ctx->mvcc_acc_ctx_.get_snapshot_version();
stores = &iter_tables;
for (int64_t i = stores->count() - 1; OB_SUCC(ret) && i >= 0; i--) {
lock_state.reset();
if (NULL == stores->at(i)) {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(WARN, "ObIStore is null", K(ret), K(i));
} else if (stores->at(i)->is_data_memtable()) {
ObMemtable *memtable = static_cast<ObMemtable *>(stores->at(i));
if (OB_FAIL(memtable->get_mvcc_engine().check_row_locked(ctx->mvcc_acc_ctx_, &mtk, lock_state))) {
TRANS_LOG(WARN, "mvcc engine check row lock fail", K(ret), K(mtk));
}
} else if (stores->at(i)->is_sstable()) {
blocksstable::ObSSTable *sstable = static_cast<blocksstable::ObSSTable *>(stores->at(i));
if (OB_FAIL(sstable->check_row_locked(param, context, rowkey, lock_state))) {
TRANS_LOG(WARN, "sstable check row lock fail", K(ret), K(rowkey));
}
TRANS_LOG(DEBUG, "check_row_locked meet sstable", K(ret), K(rowkey), K(*sstable));
} else {
ret = OB_ERR_UNEXPECTED;
TRANS_LOG(ERROR, "unknown store type", K(ret));
}
if (OB_SUCC(ret)) {
if (lock_state.is_locked_) {
break;
} else if (max_trans_version < lock_state.trans_version_) {
max_trans_version = lock_state.trans_version_;
}
}
}
}
}
return ret;
}
int ObRowConflictHandler::check_foreign_key_constraint(const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const common::ObStoreRowkey &rowkey)
{
int ret = OB_SUCCESS;
ObMvccAccessCtx acc_ctx = context.store_ctx_->mvcc_acc_ctx_;
blocksstable::ObDatumRowkeyHelper rowkey_converter;
blocksstable::ObDatumRowkey datum_rowkey;
if (OB_FAIL(rowkey_converter.convert_datum_rowkey(rowkey.get_rowkey(), datum_rowkey))) {
STORAGE_LOG(WARN, "Failed to convert datum rowkey", K(ret), K(rowkey));
} else if (OB_FAIL(check_row_locked(param, context, datum_rowkey, false /* by_myself */, true /* post_lock */))) {
if (OB_TRY_LOCK_ROW_CONFLICT == ret) {
if (REACH_TIME_INTERVAL(1000 * 1000)) {
TRANS_LOG(
WARN, "meet lock conflict during check foreign key constraint", K(ret), K(acc_ctx.get_tx_id()), K(rowkey));
}
} else if (OB_TRANSACTION_SET_VIOLATION == ret) {
if (REACH_TIME_INTERVAL(1000 * 1000)) {
TRANS_LOG(WARN,
"meet tsc during check foreign key constraint",
K(ret),
K(acc_ctx.get_tx_id()),
K(acc_ctx.get_snapshot_version()));
}
} else {
TRANS_LOG(WARN, "check row locked failed", K(param), K(context), K(rowkey));
}
}
return ret;
}
int ObRowConflictHandler::check_foreign_key_constraint_for_memtable(ObMvccAccessCtx &ctx,
ObMvccRow *value,
ObStoreRowLockState &lock_state)
{
int ret = OB_SUCCESS;
if (OB_ISNULL(row)) {
if (OB_ISNULL(value)) {
ret = OB_BAD_NULL_ERROR;
TRANS_LOG(ERROR, "the ObMvccValueIterator is null", K(ret));
} else if (OB_FAIL(row->check_row_locked(*ctx, lock_state))) {
} else if (OB_FAIL(value->check_row_locked(ctx, lock_state))) {
TRANS_LOG(WARN, "check row locked fail", K(ret), K(lock_state));
} else {
const ObTransID my_tx_id = ctx->get_tx_id();
const share::SCN snapshot_version = ctx->get_snapshot_version();
const ObTransID my_tx_id = ctx.get_tx_id();
const share::SCN snapshot_version = ctx.get_snapshot_version();
if (lock_state.is_locked_ && my_tx_id != lock_state.lock_trans_id_) {
ret = OB_TRY_LOCK_ROW_CONFLICT;
if (REACH_TIME_INTERVAL(1000 * 1000)) {
@ -169,6 +318,5 @@ int ObRowConflictHandler::post_row_read_conflict(ObMvccAccessCtx &acc_ctx,
}
return ret;
}
}
}
} // namespace storage
} // namespace oceanbase

View File

@ -16,8 +16,10 @@
#include <stdint.h>
namespace oceanbase {
namespace memtable {
class ObMvccRow;
class ObMvccAccessCtx;
class ObMvccRow;
class ObMvccValueIterator;
}
namespace transaction {
class ObTransID;
@ -32,12 +34,28 @@ namespace share {
class ObLSID;
class SCN;
}
namespace storage {
namespace blocksstable {
class ObDatumRowkey;
}
namespace storage
{
class ObTableIterParam;
class ObTableAccessContext;
class ObStoreRowLockState;
class ObTxTableGuards;
class ObRowConflictHandler {
public:
static int check_row_locked(const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey,
const bool by_myself = false,
const bool post_lock = false);
static int check_row_locked(const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const blocksstable::ObDatumRowkey &rowkey,
ObStoreRowLockState &lock_state,
share::SCN &max_trans_veresion);
// There are 2 cases that can lead to row conflict in foreign key constraint check:
// Case 1: the row is locked, mainly beacuse there's an uncommitted transaction on it.
// If the check meet this case, we should call post_row_read_conflict to put it into
@ -47,8 +65,11 @@ public:
// snapshot_version of current transaction, it will cause tsc.
// If the check meet this case, we should return error code to sql layer, and it will
// choose to retry or throw an exception according to the isolation level.
static int check_foreign_key_constraint_for_memtable(memtable::ObMvccAccessCtx *ctx,
memtable::ObMvccRow *row,
static int check_foreign_key_constraint(const storage::ObTableIterParam &param,
storage::ObTableAccessContext &context,
const common::ObStoreRowkey &rowkey);
static int check_foreign_key_constraint_for_memtable(memtable::ObMvccAccessCtx &acc_ctx,
memtable::ObMvccRow *value,
storage::ObStoreRowLockState &lock_state);
static int check_foreign_key_constraint_for_sstable(storage::ObTxTableGuards &tx_table_guards,
const transaction::ObTransID &read_trans_id,
@ -71,6 +92,6 @@ public:
const int64_t total_trans_node_cnt,
const share::SCN &trans_scn);
};
}
}
} // namespace storage
} // namespace oceanbase
#endif

View File

@ -71,6 +71,7 @@
#include "storage/tx/ob_trans_service.h"
#include "storage/tx_storage/ob_ls_service.h"
#include "storage/compaction/ob_medium_list_checker.h"
#include "storage/memtable/ob_row_conflict_handler.h"
namespace oceanbase
{
@ -2382,19 +2383,25 @@ int ObTablet::check_row_locked_by_myself(
bool &locked)
{
int ret = OB_SUCCESS;
int tmp_ret = OB_SUCCESS;
ObArenaAllocator allocator(common::ObMemAttr(MTL_ID(), ObModIds::OB_STORE_ROW_LOCK_CHECKER));
ObMemtable *write_memtable = nullptr;
ObTableIterParam param;
ObTableAccessContext context;
locked = false;
if (OB_FAIL(prepare_memtable(relative_table, store_ctx, write_memtable))) {
LOG_WARN("prepare write memtable fail", K(ret), K(relative_table));
} else if (OB_FAIL(prepare_param_ctx(allocator, relative_table, store_ctx, param, context))) {
LOG_WARN("prepare param context fail, ", K(ret), K(rowkey));
} else if (OB_FAIL(write_memtable->check_row_locked_by_myself(param, context, rowkey, locked))) {
LOG_WARN("failed to lock write memtable", K(ret), K(rowkey));
} else if (OB_TMP_FAIL(ObRowConflictHandler::check_row_locked(param, context, rowkey, true /* by_myself */))) {
if (OB_TRY_LOCK_ROW_CONFLICT == tmp_ret) {
locked = true;
} else if (OB_TRANSACTION_SET_VIOLATION != tmp_ret) {
ret = tmp_ret;
LOG_WARN("failed to check row locked by myself", K(tmp_ret), K(rowkey));
}
}
return ret;
}