diff --git a/table/tables/tables.go b/table/tables/tables.go index 2afeead33d..ecf13dca19 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -413,61 +413,10 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64, bs := kv.NewBufferStore(txn) defer bs.Release() - if t.meta.PKIsHandle { - // Check key exists. - recordKey := t.RecordKey(recordID, nil) - e := kv.ErrKeyExists.Gen("Duplicate entry '%d' for key 'PRIMARY'", recordID) - txn.SetOption(kv.PresumeKeyNotExistsError, e) - _, err = txn.Get(recordKey) - if err == nil { - return recordID, errors.Trace(e) - } else if !terror.ErrorEqual(err, kv.ErrNotExist) { - return 0, errors.Trace(err) - } - txn.DelOption(kv.PresumeKeyNotExistsError) - } - - for _, v := range t.indices { - if v == nil || v.State == model.StateDeleteOnly || v.State == model.StateDeleteReorganization { - // if index is in delete only or delete reorganization state, we can't add it. - continue - } - colVals, _ := v.FetchValues(r) - - if v.Unique || v.Primary { - // Pass pre-composed error to txn. - strVals := make([]string, 0, len(colVals)) - for _, cv := range colVals { - cvs := "NULL" - if cv != nil { - cvs, err = types.ToString(cv) - if err != nil { - return 0, errors.Trace(err) - } - } - strVals = append(strVals, cvs) - } - entryKey := strings.Join(strVals, "-") - e := kv.ErrKeyExists.Gen("Duplicate entry '%s' for key '%s'", entryKey, v.Name) - txn.SetOption(kv.PresumeKeyNotExistsError, e) - } - if err = v.X.Create(bs, colVals, recordID); err != nil { - if terror.ErrorEqual(err, kv.ErrKeyExists) { - // Get the duplicate row handle - // For insert on duplicate syntax, we should update the row - iter, _, err1 := v.X.Seek(bs, colVals) - if err1 != nil { - return 0, errors.Trace(err1) - } - _, h, err1 := iter.Next() - if err1 != nil { - return 0, errors.Trace(err1) - } - return h, errors.Trace(err) - } - return 0, errors.Trace(err) - } - txn.DelOption(kv.PresumeKeyNotExistsError) + // Insert new entries into indices. + h, err := t.addIndices(ctx, recordID, r, bs) + if err != nil { + return h, errors.Trace(err) } if err = t.LockRow(ctx, recordID); err != nil { @@ -479,7 +428,6 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64, if col.IsPKHandleColumn(t.meta) { continue } - var value interface{} if col.State == model.StateWriteOnly || col.State == model.StateWriteReorganization { // if col is in write only or write reorganization state, we must add it with its default value. @@ -510,6 +458,82 @@ func (t *Table) AddRecord(ctx context.Context, r []interface{}) (recordID int64, return recordID, nil } +// Generate index content string representation. +func (t *Table) genIndexKeyStr(colVals []interface{}) (string, error) { + // Pass pre-composed error to txn. + strVals := make([]string, 0, len(colVals)) + for _, cv := range colVals { + cvs := "NULL" + var err error + if cv != nil { + cvs, err = types.ToString(cv) + if err != nil { + return "", errors.Trace(err) + } + } + strVals = append(strVals, cvs) + } + return strings.Join(strVals, "-"), nil +} + +// Add data into indices. +func (t *Table) addIndices(ctx context.Context, recordID int64, r []interface{}, bs *kv.BufferStore) (int64, error) { + txn, err := ctx.GetTxn(false) + if err != nil { + return 0, errors.Trace(err) + } + // Clean up lazy check error environment + defer txn.DelOption(kv.PresumeKeyNotExistsError) + if t.meta.PKIsHandle { + // Check key exists. + recordKey := t.RecordKey(recordID, nil) + e := kv.ErrKeyExists.Gen("Duplicate entry '%d' for key 'PRIMARY'", recordID) + txn.SetOption(kv.PresumeKeyNotExistsError, e) + _, err = txn.Get(recordKey) + if err == nil { + return recordID, errors.Trace(e) + } else if !terror.ErrorEqual(err, kv.ErrNotExist) { + return 0, errors.Trace(err) + } + txn.DelOption(kv.PresumeKeyNotExistsError) + } + + for _, v := range t.indices { + if v == nil || v.State == model.StateDeleteOnly || v.State == model.StateDeleteReorganization { + // if index is in delete only or delete reorganization state, we can't add it. + continue + } + colVals, _ := v.FetchValues(r) + var dupKeyErr error + if v.Unique || v.Primary { + entryKey, err1 := t.genIndexKeyStr(colVals) + if err1 != nil { + return 0, errors.Trace(err1) + } + dupKeyErr = kv.ErrKeyExists.Gen("Duplicate entry '%s' for key '%s'", entryKey, v.Name) + txn.SetOption(kv.PresumeKeyNotExistsError, dupKeyErr) + } + if err = v.X.Create(bs, colVals, recordID); err != nil { + if terror.ErrorEqual(err, kv.ErrKeyExists) { + // Get the duplicate row handle + // For insert on duplicate syntax, we should update the row + iter, _, err1 := v.X.Seek(bs, colVals) + if err1 != nil { + return 0, errors.Trace(err1) + } + _, h, err1 := iter.Next() + if err1 != nil { + return 0, errors.Trace(err1) + } + return h, errors.Trace(dupKeyErr) + } + return 0, errors.Trace(err) + } + txn.DelOption(kv.PresumeKeyNotExistsError) + } + return 0, nil +} + // EncodeValue implements table.Table EncodeValue interface. func (t *Table) EncodeValue(raw interface{}) ([]byte, error) { v, err := t.flatten(raw)