store/tikv: remove use of SchemaLease transaction option in store/tikv (#24331)

This commit is contained in:
disksing
2021-04-29 02:39:56 +08:00
committed by GitHub
parent 91ca4eafdc
commit 75be70cd4c
3 changed files with 15 additions and 7 deletions

View File

@ -129,6 +129,8 @@ func (txn *tikvTxn) SetOption(opt int, val interface{}) {
txn: txn.KVTxn,
binInfo: val.(*binloginfo.BinlogInfo), // val cannot be other type.
})
case tikvstore.SchemaChecker:
txn.SetSchemaLeaseChecker(val.(tikv.SchemaLeaseChecker))
case tikvstore.IsolationLevel:
level := getTiKVIsolationLevel(val.(kv.IsoLevel))
txn.KVTxn.GetSnapshot().SetIsolationLevel(level)

View File

@ -1252,7 +1252,8 @@ type SchemaVer interface {
SchemaMetaVersion() int64
}
type schemaLeaseChecker interface {
// SchemaLeaseChecker is used to validate schema version is not changed during transaction execution.
type SchemaLeaseChecker interface {
// CheckBySchemaVer checks if the schema has changed for the transaction related tables between the startSchemaVer
// and the schema version at txnTS, all the related schema changes will be returned.
CheckBySchemaVer(txnTS uint64, startSchemaVer SchemaVer) (*RelatedSchemaChange, error)
@ -1398,8 +1399,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64
err := errors.Errorf("mock check schema valid failure")
failpoint.Return(nil, false, err)
})
checker, ok := c.txn.us.GetOption(kv.SchemaChecker).(schemaLeaseChecker)
if !ok {
if c.txn.schemaLeaseChecker == nil {
if c.sessionID > 0 {
logutil.Logger(ctx).Warn("schemaLeaseChecker is not set for this transaction",
zap.Uint64("sessionID", c.sessionID),
@ -1408,7 +1408,7 @@ func (c *twoPhaseCommitter) checkSchemaValid(ctx context.Context, checkTS uint64
}
return nil, false, nil
}
relatedChanges, err := checker.CheckBySchemaVer(checkTS, startInfoSchema)
relatedChanges, err := c.txn.schemaLeaseChecker.CheckBySchemaVer(checkTS, startInfoSchema)
if err != nil {
if tryAmend && relatedChanges != nil && relatedChanges.Amendable && c.txn.schemaAmender != nil {
memAmended, amendErr := c.tryAmendTxn(ctx, startInfoSchema, relatedChanges)

View File

@ -74,9 +74,10 @@ type KVTxn struct {
// commitCallback is called after current transaction gets committed
commitCallback func(info string, err error)
binlog BinlogExecutor
isPessimistic bool
kvFilter KVFilter
binlog BinlogExecutor
schemaLeaseChecker SchemaLeaseChecker
isPessimistic bool
kvFilter KVFilter
}
func newTiKVTxn(store *KVStore, txnScope string) (*KVTxn, error) {
@ -200,6 +201,11 @@ func (txn *KVTxn) DelOption(opt int) {
txn.us.DelOption(opt)
}
// SetSchemaLeaseChecker sets a hook to check schema version.
func (txn *KVTxn) SetSchemaLeaseChecker(checker SchemaLeaseChecker) {
txn.schemaLeaseChecker = checker
}
// SetPessimistic indicates if the transaction should use pessimictic lock.
func (txn *KVTxn) SetPessimistic(b bool) {
txn.isPessimistic = b