tables: Address comment and refactor AddRecord methord
This commit is contained in:
@ -413,61 +413,10 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64,
|
||||
bs := kv.NewBufferStore(txn)
|
||||
defer bs.Release()
|
||||
|
||||
if t.meta.PKIsHandle {
|
||||
// Check key exists.
|
||||
recordKey := t.RecordKey(recordID, nil)
|
||||
e := kv.ErrKeyExists.Gen("Duplicate entry '%d' for key 'PRIMARY'", recordID)
|
||||
txn.SetOption(kv.PresumeKeyNotExistsError, e)
|
||||
_, err = txn.Get(recordKey)
|
||||
if err == nil {
|
||||
return recordID, errors.Trace(e)
|
||||
} else if !terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
txn.DelOption(kv.PresumeKeyNotExistsError)
|
||||
}
|
||||
|
||||
for _, v := range t.indices {
|
||||
if v == nil || v.State == model.StateDeleteOnly || v.State == model.StateDeleteReorganization {
|
||||
// if index is in delete only or delete reorganization state, we can't add it.
|
||||
continue
|
||||
}
|
||||
colVals, _ := v.FetchValues(r)
|
||||
|
||||
if v.Unique || v.Primary {
|
||||
// Pass pre-composed error to txn.
|
||||
strVals := make([]string, 0, len(colVals))
|
||||
for _, cv := range colVals {
|
||||
cvs := "NULL"
|
||||
if cv != nil {
|
||||
cvs, err = types.ToString(cv)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
}
|
||||
strVals = append(strVals, cvs)
|
||||
}
|
||||
entryKey := strings.Join(strVals, "-")
|
||||
e := kv.ErrKeyExists.Gen("Duplicate entry '%s' for key '%s'", entryKey, v.Name)
|
||||
txn.SetOption(kv.PresumeKeyNotExistsError, e)
|
||||
}
|
||||
if err = v.X.Create(bs, colVals, recordID); err != nil {
|
||||
if terror.ErrorEqual(err, kv.ErrKeyExists) {
|
||||
// Get the duplicate row handle
|
||||
// For insert on duplicate syntax, we should update the row
|
||||
iter, _, err1 := v.X.Seek(bs, colVals)
|
||||
if err1 != nil {
|
||||
return 0, errors.Trace(err1)
|
||||
}
|
||||
_, h, err1 := iter.Next()
|
||||
if err1 != nil {
|
||||
return 0, errors.Trace(err1)
|
||||
}
|
||||
return h, errors.Trace(err)
|
||||
}
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
txn.DelOption(kv.PresumeKeyNotExistsError)
|
||||
// Insert new entries into indices.
|
||||
h, err := t.addIndices(ctx, recordID, r, bs)
|
||||
if err != nil {
|
||||
return h, errors.Trace(err)
|
||||
}
|
||||
|
||||
if err = t.LockRow(ctx, recordID); err != nil {
|
||||
@ -479,7 +428,6 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64,
|
||||
if col.IsPKHandleColumn(t.meta) {
|
||||
continue
|
||||
}
|
||||
|
||||
var value interface{}
|
||||
if col.State == model.StateWriteOnly || col.State == model.StateWriteReorganization {
|
||||
// if col is in write only or write reorganization state, we must add it with its default value.
|
||||
@ -510,6 +458,82 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64,
|
||||
return recordID, nil
|
||||
}
|
||||
|
||||
// Generate index content string representation.
|
||||
func (t *Table) genIndexKeyStr(colVals []interface{}) (string, error) {
|
||||
// Pass pre-composed error to txn.
|
||||
strVals := make([]string, 0, len(colVals))
|
||||
for _, cv := range colVals {
|
||||
cvs := "NULL"
|
||||
var err error
|
||||
if cv != nil {
|
||||
cvs, err = types.ToString(cv)
|
||||
if err != nil {
|
||||
return "", errors.Trace(err)
|
||||
}
|
||||
}
|
||||
strVals = append(strVals, cvs)
|
||||
}
|
||||
return strings.Join(strVals, "-"), nil
|
||||
}
|
||||
|
||||
// Add data into indices.
|
||||
func (t *Table) addIndices(ctx context.Context, recordID int64, r []interface{}, bs *kv.BufferStore) (int64, error) {
|
||||
txn, err := ctx.GetTxn(false)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
// Clean up lazy check error environment
|
||||
defer txn.DelOption(kv.PresumeKeyNotExistsError)
|
||||
if t.meta.PKIsHandle {
|
||||
// Check key exists.
|
||||
recordKey := t.RecordKey(recordID, nil)
|
||||
e := kv.ErrKeyExists.Gen("Duplicate entry '%d' for key 'PRIMARY'", recordID)
|
||||
txn.SetOption(kv.PresumeKeyNotExistsError, e)
|
||||
_, err = txn.Get(recordKey)
|
||||
if err == nil {
|
||||
return recordID, errors.Trace(e)
|
||||
} else if !terror.ErrorEqual(err, kv.ErrNotExist) {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
txn.DelOption(kv.PresumeKeyNotExistsError)
|
||||
}
|
||||
|
||||
for _, v := range t.indices {
|
||||
if v == nil || v.State == model.StateDeleteOnly || v.State == model.StateDeleteReorganization {
|
||||
// if index is in delete only or delete reorganization state, we can't add it.
|
||||
continue
|
||||
}
|
||||
colVals, _ := v.FetchValues(r)
|
||||
var dupKeyErr error
|
||||
if v.Unique || v.Primary {
|
||||
entryKey, err1 := t.genIndexKeyStr(colVals)
|
||||
if err1 != nil {
|
||||
return 0, errors.Trace(err1)
|
||||
}
|
||||
dupKeyErr = kv.ErrKeyExists.Gen("Duplicate entry '%s' for key '%s'", entryKey, v.Name)
|
||||
txn.SetOption(kv.PresumeKeyNotExistsError, dupKeyErr)
|
||||
}
|
||||
if err = v.X.Create(bs, colVals, recordID); err != nil {
|
||||
if terror.ErrorEqual(err, kv.ErrKeyExists) {
|
||||
// Get the duplicate row handle
|
||||
// For insert on duplicate syntax, we should update the row
|
||||
iter, _, err1 := v.X.Seek(bs, colVals)
|
||||
if err1 != nil {
|
||||
return 0, errors.Trace(err1)
|
||||
}
|
||||
_, h, err1 := iter.Next()
|
||||
if err1 != nil {
|
||||
return 0, errors.Trace(err1)
|
||||
}
|
||||
return h, errors.Trace(dupKeyErr)
|
||||
}
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
txn.DelOption(kv.PresumeKeyNotExistsError)
|
||||
}
|
||||
return 0, nil
|
||||
}
|
||||
|
||||
// EncodeValue implements table.Table EncodeValue interface.
|
||||
func (t *Table) EncodeValue(raw interface{}) ([]byte, error) {
|
||||
v, err := t.flatten(raw)
|
||||
|
||||
Reference in New Issue
Block a user