diff --git a/ddl/column.go b/ddl/column.go index ff692eb440..ed14497ece 100644 --- a/ddl/column.go +++ b/ddl/column.go @@ -346,7 +346,7 @@ func (d *ddl) backfillColumnInTxn(t table.Table, colMeta *columnMeta, handles [] } newColumnIDs = append(newColumnIDs, colMeta.colID) newRow = append(newRow, colMeta.defaultVal) - newRowVal, err := tablecodec.EncodeRow(sc, newRow, newColumnIDs) + newRowVal, err := tablecodec.EncodeRow(sc, newRow, newColumnIDs, nil, nil) if err != nil { return 0, errors.Trace(err) } diff --git a/executor/builder.go b/executor/builder.go index 1b68188337..1e132e235d 100644 --- a/executor/builder.go +++ b/executor/builder.go @@ -354,6 +354,7 @@ func (b *executorBuilder) buildInsert(v *plan.Insert) Executor { } ivs.Table = v.Table + ivs.initDefaultValBuf() if v.IsReplace { return b.buildReplace(ivs) } @@ -380,6 +381,7 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor { GenColumns: v.GenCols.Columns, GenExprs: v.GenCols.Exprs, } + insertVal.initDefaultValBuf() tableCols := tbl.Cols() columns, err := insertVal.getColumns(tableCols) if err != nil { diff --git a/executor/write.go b/executor/write.go index e28d457a6e..9e51992ccc 100644 --- a/executor/write.go +++ b/executor/write.go @@ -466,9 +466,11 @@ func (e *DeleteExec) Open(goCtx goctx.Context) error { // NewLoadDataInfo returns a LoadDataInfo structure, and it's only used for tests now. func NewLoadDataInfo(row []types.Datum, ctx context.Context, tbl table.Table, cols []*table.Column) *LoadDataInfo { + insertVal := &InsertValues{baseExecutor: newBaseExecutor(nil, ctx), Table: tbl} + insertVal.initDefaultValBuf() return &LoadDataInfo{ row: row, - insertVal: &InsertValues{baseExecutor: newBaseExecutor(nil, ctx), Table: tbl}, + insertVal: insertVal, Table: tbl, Ctx: ctx, columns: cols, @@ -806,6 +808,12 @@ func (e *LoadData) Open(goCtx goctx.Context) error { return nil } +type defaultVal struct { + val types.Datum + // We evaluate the default value lazily. The valid indicates whether the val is evaluated. + valid bool +} + // InsertValues is the data to insert. type InsertValues struct { baseExecutor @@ -825,6 +833,9 @@ type InsertValues struct { GenColumns []*ast.ColumnName GenExprs []expression.Expression + + // colDefaultVals is used to store casted default value. + colDefaultVals []defaultVal } // InsertExec represents an insert executor. @@ -839,13 +850,20 @@ type InsertExec struct { finished bool } +func (e *InsertValues) initDefaultValBuf() { + e.colDefaultVals = make([]defaultVal, len(e.Table.Cols())) +} + func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error) { // If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode. - batchInsert := e.ctx.GetSessionVars().BatchInsert && !e.ctx.GetSessionVars().InTxn() - batchSize := e.ctx.GetSessionVars().DMLBatchSize + sessVars := e.ctx.GetSessionVars() + batchInsert := sessVars.BatchInsert && !sessVars.InTxn() + batchSize := sessVars.DMLBatchSize txn := e.ctx.Txn() rowCount := 0 + sessVars.BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap) + defer sessVars.CleanBuffers() for _, row := range rows { if batchInsert && rowCount >= batchSize { if err := e.ctx.NewTxn(); err != nil { @@ -854,6 +872,7 @@ func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error } txn = e.ctx.Txn() rowCount = 0 + sessVars.BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap) } if len(e.OnDuplicate) == 0 && !e.IgnoreErr { txn.SetOption(kv.PresumeKeyNotExists, nil) @@ -861,7 +880,9 @@ func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error h, err := e.Table.AddRecord(e.ctx, row, false) txn.DelOption(kv.PresumeKeyNotExists) if err == nil { - getDirtyDB(e.ctx).addRow(e.Table.Meta().ID, h, row) + if !sessVars.ImportingData { + getDirtyDB(e.ctx).addRow(e.Table.Meta().ID, h, row) + } rowCount++ continue } @@ -886,7 +907,7 @@ func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error } if e.lastInsertID != 0 { - e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID) + sessVars.SetLastInsertID(e.lastInsertID) } e.finished = true return nil, nil @@ -1234,6 +1255,21 @@ func (e *InsertValues) filterErr(err error, ignoreErr bool) error { return nil } +func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (types.Datum, error) { + if e.colDefaultVals[idx].valid { + return e.colDefaultVals[idx].val, nil + } + + defaultVal, err := table.GetColDefaultValue(e.ctx, col.ToInfo()) + if err != nil { + return types.Datum{}, errors.Trace(err) + } + + e.colDefaultVals[idx].val = defaultVal + e.colDefaultVals[idx].valid = true + return defaultVal, nil +} + // initDefaultValues fills generated columns, auto_increment column and empty column. // For NOT NULL column, it will return error or use zero value based on sql_mode. func (e *InsertValues) initDefaultValues(row []types.Datum, hasValue []bool, ignoreErr bool) error { @@ -1258,7 +1294,7 @@ func (e *InsertValues) initDefaultValues(row []types.Datum, hasValue []bool, ign } if needDefaultValue { var err error - row[i], err = table.GetColDefaultValue(e.ctx, c.ToInfo()) + row[i], err = e.getColDefaultValue(i, c) if e.filterErr(err, ignoreErr) != nil { return errors.Trace(err) } diff --git a/kv/buffer_store.go b/kv/buffer_store.go index e0858d6dbd..d77ef9b24b 100644 --- a/kv/buffer_store.go +++ b/kv/buffer_store.go @@ -17,6 +17,15 @@ import ( "github.com/juju/errors" ) +var ( + // DefaultTxnMembufCap is the default transaction membuf capability. + DefaultTxnMembufCap = 4 * 1024 + // ImportingTxnMembufCap is the capability of tidb importing data situation. + ImportingTxnMembufCap = 32 * 1024 + // TempTxnMemBufCap is the capability of temporary membuf. + TempTxnMemBufCap = 64 +) + // BufferStore wraps a Retriever for read and a MemBuffer for buffered write. // Common usage pattern: // bs := NewBufferStore(r) // use BufferStore to wrap a Retriever @@ -30,13 +39,26 @@ type BufferStore struct { } // NewBufferStore creates a BufferStore using r for read. -func NewBufferStore(r Retriever) *BufferStore { +func NewBufferStore(r Retriever, cap int) *BufferStore { + if cap <= 0 { + cap = DefaultTxnMembufCap + } return &BufferStore{ r: r, - MemBuffer: &lazyMemBuffer{}, + MemBuffer: &lazyMemBuffer{cap: cap}, } } +// Reset resets s.MemBuffer. +func (s *BufferStore) Reset() { + s.MemBuffer.Reset() +} + +// SetCap sets the MemBuffer capability. +func (s *BufferStore) SetCap(cap int) { + s.MemBuffer.SetCap(cap) +} + // Get implements the Retriever interface. func (s *BufferStore) Get(k Key) ([]byte, error) { val, err := s.MemBuffer.Get(k) diff --git a/kv/buffer_store_test.go b/kv/buffer_store_test.go index 0fb2ad82c9..af84a9294c 100644 --- a/kv/buffer_store_test.go +++ b/kv/buffer_store_test.go @@ -25,7 +25,7 @@ type testBufferStoreSuite struct{} var _ = Suite(testBufferStoreSuite{}) func (s testBufferStoreSuite) TestGetSet(c *C) { - bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer()}) + bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap) key := Key("key") value, err := bs.Get(key) c.Check(err, NotNil) @@ -39,7 +39,7 @@ func (s testBufferStoreSuite) TestGetSet(c *C) { } func (s testBufferStoreSuite) TestSaveTo(c *C) { - bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer()}) + bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap) var buf bytes.Buffer for i := 0; i < 10; i++ { fmt.Fprint(&buf, i) @@ -49,7 +49,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) { } bs.Set(Key("novalue"), nil) - mutator := NewMemDbBuffer() + mutator := NewMemDbBuffer(DefaultTxnMembufCap) err := bs.SaveTo(mutator) c.Check(err, IsNil) diff --git a/kv/kv.go b/kv/kv.go index f21b8d499f..adf9a21c18 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -109,6 +109,11 @@ type MemBuffer interface { Size() int // Len returns the number of entries in the DB. Len() int + // Reset cleanup the MemBuffer + Reset() + // SetCap sets the MemBuffer capability, to reduce memory allocations. + // Please call it before you use the MemBuffer, otherwise it will not works. + SetCap(cap int) } // Transaction defines the interface for operations inside a Transaction. diff --git a/kv/mem_buffer_test.go b/kv/mem_buffer_test.go index 11befecf06..92807b528d 100644 --- a/kv/mem_buffer_test.go +++ b/kv/mem_buffer_test.go @@ -43,11 +43,11 @@ type testKVSuite struct { func (s *testKVSuite) SetUpSuite(c *C) { s.bs = make([]MemBuffer, 1) - s.bs[0] = NewMemDbBuffer() + s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap) } func (s *testKVSuite) ResetMembuffers() { - s.bs[0] = NewMemDbBuffer() + s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap) } func insertData(c *C, buffer MemBuffer) { @@ -152,7 +152,7 @@ func (s *testKVSuite) TestNewIterator(c *C) { func (s *testKVSuite) TestIterNextUntil(c *C) { defer testleak.AfterTest(c)() - buffer := NewMemDbBuffer() + buffer := NewMemDbBuffer(DefaultTxnMembufCap) insertData(c, buffer) iter, err := buffer.Seek(nil) @@ -209,7 +209,7 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) { } func (s *testKVSuite) TestBufferLimit(c *C) { - buffer := NewMemDbBuffer().(*memDbBuffer) + buffer := NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer) buffer.bufferSizeLimit = 1000 buffer.entrySizeLimit = 500 var err error @@ -222,7 +222,7 @@ func (s *testKVSuite) TestBufferLimit(c *C) { err = buffer.Set([]byte("yz"), make([]byte, 499)) c.Assert(err, NotNil) // buffer size limit - buffer = NewMemDbBuffer().(*memDbBuffer) + buffer = NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer) buffer.bufferLenLimit = 10 for i := 0; i < 10; i++ { err = buffer.Set([]byte{byte(i)}, []byte{byte(i)}) @@ -239,7 +239,7 @@ func BenchmarkMemDbBufferSequential(b *testing.B) { for i := 0; i < opCnt; i++ { data[i] = encodeInt(i) } - buffer := NewMemDbBuffer() + buffer := NewMemDbBuffer(DefaultTxnMembufCap) benchmarkSetGet(b, buffer, data) b.ReportAllocs() } @@ -250,20 +250,20 @@ func BenchmarkMemDbBufferRandom(b *testing.B) { data[i] = encodeInt(i) } shuffle(data) - buffer := NewMemDbBuffer() + buffer := NewMemDbBuffer(DefaultTxnMembufCap) benchmarkSetGet(b, buffer, data) b.ReportAllocs() } func BenchmarkMemDbIter(b *testing.B) { - buffer := NewMemDbBuffer() + buffer := NewMemDbBuffer(DefaultTxnMembufCap) benchIterator(b, buffer) b.ReportAllocs() } func BenchmarkMemDbCreation(b *testing.B) { for i := 0; i < b.N; i++ { - NewMemDbBuffer() + NewMemDbBuffer(DefaultTxnMembufCap) } b.ReportAllocs() } diff --git a/kv/memdb_buffer.go b/kv/memdb_buffer.go index 20bdc73b03..7e0047b74c 100644 --- a/kv/memdb_buffer.go +++ b/kv/memdb_buffer.go @@ -41,9 +41,9 @@ type memDbIter struct { } // NewMemDbBuffer creates a new memDbBuffer. -func NewMemDbBuffer() MemBuffer { +func NewMemDbBuffer(cap int) MemBuffer { return &memDbBuffer{ - db: memdb.New(comparer.DefaultComparer, 4*1024), + db: memdb.New(comparer.DefaultComparer, cap), entrySizeLimit: TxnEntrySizeLimit, bufferLenLimit: atomic.LoadUint64(&TxnEntryCountLimit), bufferSizeLimit: TxnTotalSizeLimit, @@ -65,6 +65,10 @@ func (m *memDbBuffer) Seek(k Key) (Iterator, error) { return i, nil } +func (m *memDbBuffer) SetCap(cap int) { + +} + func (m *memDbBuffer) SeekReverse(k Key) (Iterator, error) { var i *memDbIter if k == nil { @@ -120,6 +124,11 @@ func (m *memDbBuffer) Len() int { return m.db.Len() } +// Reset cleanup the MemBuffer. +func (m *memDbBuffer) Reset() { + m.db.Reset() +} + // Next implements the Iterator Next. func (i *memDbIter) Next() error { if i.reverse { diff --git a/kv/mock.go b/kv/mock.go index ca909709e5..7e39ac5b48 100644 --- a/kv/mock.go +++ b/kv/mock.go @@ -99,6 +99,14 @@ func (t *mockTxn) GetMemBuffer() MemBuffer { return nil } +func (t *mockTxn) SetCap(cap int) { + +} + +func (t *mockTxn) Reset() { + t.valid = false +} + // NewMockTxn new a mockTxn. func NewMockTxn() Transaction { return &mockTxn{ @@ -126,7 +134,7 @@ func (s *mockStorage) BeginWithStartTS(startTS uint64) (Transaction, error) { func (s *mockStorage) GetSnapshot(ver Version) (Snapshot, error) { return &mockSnapshot{ - store: NewMemDbBuffer(), + store: NewMemDbBuffer(DefaultTxnMembufCap), }, nil } diff --git a/kv/union_store.go b/kv/union_store.go index db6e776be4..7823528ff4 100644 --- a/kv/union_store.go +++ b/kv/union_store.go @@ -68,7 +68,7 @@ type unionStore struct { // NewUnionStore builds a new UnionStore. func NewUnionStore(snapshot Snapshot) UnionStore { return &unionStore{ - BufferStore: NewBufferStore(snapshot), + BufferStore: NewBufferStore(snapshot, DefaultTxnMembufCap), snapshot: snapshot, lazyConditionPairs: make(map[string]*conditionPair), opts: make(map[Option]interface{}), @@ -99,7 +99,8 @@ func (it invalidIterator) Close() {} // lazyMemBuffer wraps a MemBuffer which is to be initialized when it is modified. type lazyMemBuffer struct { - mb MemBuffer + mb MemBuffer + cap int } func (lmb *lazyMemBuffer) Get(k Key) ([]byte, error) { @@ -112,7 +113,7 @@ func (lmb *lazyMemBuffer) Get(k Key) ([]byte, error) { func (lmb *lazyMemBuffer) Set(key Key, value []byte) error { if lmb.mb == nil { - lmb.mb = NewMemDbBuffer() + lmb.mb = NewMemDbBuffer(lmb.cap) } return lmb.mb.Set(key, value) @@ -120,7 +121,7 @@ func (lmb *lazyMemBuffer) Set(key Key, value []byte) error { func (lmb *lazyMemBuffer) Delete(k Key) error { if lmb.mb == nil { - lmb.mb = NewMemDbBuffer() + lmb.mb = NewMemDbBuffer(lmb.cap) } return lmb.mb.Delete(k) @@ -154,6 +155,16 @@ func (lmb *lazyMemBuffer) Len() int { return lmb.mb.Len() } +func (lmb *lazyMemBuffer) Reset() { + if lmb.mb != nil { + lmb.mb.Reset() + } +} + +func (lmb *lazyMemBuffer) SetCap(cap int) { + lmb.cap = cap +} + // Get implements the Retriever interface. func (us *unionStore) Get(k Key) ([]byte, error) { v, err := us.MemBuffer.Get(k) @@ -233,10 +244,20 @@ func (us *unionStore) GetOption(opt Option) interface{} { return us.opts[opt] } +// GetMemBuffer return the MemBuffer binding to this UnionStore. func (us *unionStore) GetMemBuffer() MemBuffer { return us.BufferStore.MemBuffer } +// SetCap sets membuffer capability. +func (us *unionStore) SetCap(cap int) { + us.BufferStore.SetCap(cap) +} + +func (us *unionStore) Reset() { + us.BufferStore.Reset() +} + type options map[Option]interface{} func (opts options) Get(opt Option) (interface{}, bool) { diff --git a/kv/union_store_test.go b/kv/union_store_test.go index 810b6cb143..c79c220766 100644 --- a/kv/union_store_test.go +++ b/kv/union_store_test.go @@ -27,7 +27,7 @@ type testUnionStoreSuite struct { } func (s *testUnionStoreSuite) SetUpTest(c *C) { - s.store = NewMemDbBuffer() + s.store = NewMemDbBuffer(DefaultTxnMembufCap) s.us = NewUnionStore(&mockSnapshot{s.store}) } diff --git a/kv/utils_test.go b/kv/utils_test.go index aaf1971c2b..c8e02006ec 100644 --- a/kv/utils_test.go +++ b/kv/utils_test.go @@ -23,7 +23,7 @@ type testUtilsSuite struct { } func (s testUtilsSuite) TestIncInt64(c *C) { - mb := NewMemDbBuffer() + mb := NewMemDbBuffer(DefaultTxnMembufCap) key := Key("key") v, err := IncInt64(mb, key, 1) c.Check(err, IsNil) @@ -38,7 +38,7 @@ func (s testUtilsSuite) TestIncInt64(c *C) { } func (s testUtilsSuite) TestGetInt64(c *C) { - mb := NewMemDbBuffer() + mb := NewMemDbBuffer(DefaultTxnMembufCap) key := Key("key") v, err := GetInt64(mb, key) c.Check(v, Equals, int64(0)) diff --git a/session.go b/session.go index 3c9ce6fa55..32aa255c9d 100644 --- a/session.go +++ b/session.go @@ -135,6 +135,14 @@ type session struct { statsCollector *statistics.SessionStatsCollector } +func (s *session) getMembufCap() int { + if s.sessionVars.ImportingData { + return kv.ImportingTxnMembufCap + } + + return kv.DefaultTxnMembufCap +} + func (s *session) cleanRetryInfo() { if !s.sessionVars.RetryInfo.Retrying { retryInfo := s.sessionVars.RetryInfo @@ -882,6 +890,7 @@ func (s *session) NewTxn() error { if err != nil { return errors.Trace(err) } + txn.SetCap(s.getMembufCap()) s.txn = txn s.sessionVars.TxnCtx.StartTS = txn.StartTS() return nil @@ -1276,10 +1285,12 @@ func (s *session) ActivePendingTxn() error { } future := s.txnFuture s.txnFuture = nil + txn, err := future.wait() if err != nil { return errors.Trace(err) } + txn.SetCap(s.getMembufCap()) s.txn = txn s.sessionVars.TxnCtx.StartTS = s.txn.StartTS() if s.sessionVars.Systems[variable.TxnIsolation] == ast.ReadCommitted { @@ -1296,6 +1307,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { if s.txnFuture == nil { return errors.New("transaction channel is not set") } + // no need to get txn from txnFutureCh since txn should init with startTs s.txnFuture = nil var err error @@ -1303,6 +1315,11 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error { if err != nil { return errors.Trace(err) } + s.txn.SetCap(s.getMembufCap()) + err = s.loadCommonGlobalVariablesIfNeeded() + if err != nil { + return errors.Trace(err) + } return nil } diff --git a/sessionctx/variable/session.go b/sessionctx/variable/session.go index a420cd7b0d..df4f8736b6 100644 --- a/sessionctx/variable/session.go +++ b/sessionctx/variable/session.go @@ -18,10 +18,12 @@ import ( "sync" "time" + "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/stmtctx" "github.com/pingcap/tidb/terror" + "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util/auth" ) @@ -248,6 +250,14 @@ type SessionVars struct { // EnableChunk indicates whether the chunk execution model is enabled. // TODO: remove this after tidb-server configuration "enable-chunk' removed. EnableChunk bool + + // RowValBuf is used by tablecodec.EncodeRow, to reduce runtime.growslice. + RowValBuf []byte + // BufStore stores temp KVs for a row when executing insert statement. + // We could reuse a BufStore for multiple rows of a session to reduce memory allocations. + BufStore *kv.BufferStore + // AddRowValues use to store temp insert rows value, to reduce memory allocations when importing data. + AddRowValues []types.Datum } // NewSessionVars creates a session vars object. @@ -275,6 +285,13 @@ func NewSessionVars() *SessionVars { } } +// CleanBuffers cleans the temporary bufs +func (s *SessionVars) CleanBuffers() { + s.RowValBuf = nil + s.BufStore = nil + s.AddRowValues = nil +} + // GetCharsetInfo gets charset and collation for current context. // What character set should the server translate a statement to after receiving it? // For this, the server uses the character_set_connection and collation_connection system variables. diff --git a/store/tikv/mocktikv/cluster_test.go b/store/tikv/mocktikv/cluster_test.go index 6f9742f50a..148ee2b087 100644 --- a/store/tikv/mocktikv/cluster_test.go +++ b/store/tikv/mocktikv/cluster_test.go @@ -60,7 +60,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) { rowKey := tablecodec.EncodeRowKeyWithHandle(tblID, handle) colValue := types.NewStringDatum(strconv.Itoa(int(handle))) // TODO: Should use session's TimeZone instead of UTC. - rowValue, err1 := tablecodec.EncodeRow(sc, []types.Datum{colValue}, []int64{colID}) + rowValue, err1 := tablecodec.EncodeRow(sc, []types.Datum{colValue}, []int64{colID}, nil, nil) c.Assert(err1, IsNil) txn.Set(rowKey, rowValue) diff --git a/store/tikv/txn.go b/store/tikv/txn.go index 175d952a1b..21139e75cd 100644 --- a/store/tikv/txn.go +++ b/store/tikv/txn.go @@ -65,6 +65,16 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) { }, nil } +// SetMemBufCap sets the transaction's MemBuffer capability, to reduce memory allocations. +func (txn *tikvTxn) SetCap(cap int) { + txn.us.SetCap(cap) +} + +// Reset reset tikvTxn's membuf. +func (txn *tikvTxn) Reset() { + txn.us.Reset() +} + // Get implements transaction interface. func (txn *tikvTxn) Get(k kv.Key) ([]byte, error) { txnCmdCounter.WithLabelValues("get").Inc() diff --git a/table/tables/tables.go b/table/tables/tables.go index ca0b8a4e2f..e612d8a5fd 100644 --- a/table/tables/tables.go +++ b/table/tables/tables.go @@ -30,12 +30,13 @@ import ( "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/sessionctx/binloginfo" "github.com/pingcap/tidb/sessionctx/stmtctx" + "github.com/pingcap/tidb/sessionctx/variable" "github.com/pingcap/tidb/table" "github.com/pingcap/tidb/tablecodec" "github.com/pingcap/tidb/types" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/codec" - "github.com/pingcap/tipb/go-binlog" + binlog "github.com/pingcap/tipb/go-binlog" log "github.com/sirupsen/logrus" "github.com/spaolacci/murmur3" ) @@ -215,7 +216,8 @@ func (t *Table) FirstKey() kv.Key { // Length of `oldData` and `newData` equals to length of `t.WritableCols()`. func (t *Table) UpdateRecord(ctx context.Context, h int64, oldData, newData []types.Datum, touched []bool) error { txn := ctx.Txn() - bs := kv.NewBufferStore(txn) + // TODO: reuse bs, like AddRecord does. + bs := kv.NewBufferStore(txn, kv.DefaultTxnMembufCap) // rebuild index err := t.rebuildIndices(ctx, bs, h, touched, oldData, newData) @@ -255,7 +257,7 @@ func (t *Table) UpdateRecord(ctx context.Context, h int64, oldData, newData []ty } key := t.RecordKey(h) - value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs) + value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil) if err != nil { return errors.Trace(err) } @@ -308,6 +310,17 @@ func (t *Table) rebuildIndices(ctx context.Context, rm kv.RetrieverMutator, h in return nil } +// adjustRowValuesBuf adjust sessVars.AddRowValues length, sessVars.AddRowValues stores the inserting values that is used +// by tablecodec.EncodeRow, the encoded row format is `id1, colval, id2, colval`, so the correct length is rowLen * 2. If +// the inserting row has null value, AddRecord will skip it, so the rowLen will be different, so we need to adjust it. +func adjustRowValuesBuf(sessVars *variable.SessionVars, rowLen int) { + adjustLen := rowLen * 2 + if sessVars.AddRowValues == nil || cap(sessVars.AddRowValues) < adjustLen { + sessVars.AddRowValues = make([]types.Datum, adjustLen) + } + sessVars.AddRowValues = sessVars.AddRowValues[:adjustLen] +} + // AddRecord implements table.Table AddRecord interface. func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck bool) (recordID int64, err error) { var hasRecordID bool @@ -326,9 +339,14 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck } txn := ctx.Txn() - bs := kv.NewBufferStore(txn) - - skipCheck := ctx.GetSessionVars().ImportingData + sessVars := ctx.GetSessionVars() + // when ImportingData is true, no needs to check the key constrains, so we names the variable skipCheck. + skipCheck := sessVars.ImportingData + bs := sessVars.BufStore + if bs == nil { + bs = kv.NewBufferStore(ctx.Txn(), kv.DefaultTxnMembufCap) + } + bs.Reset() if skipCheck { txn.SetOption(kv.SkipCheckForWrite, true) } @@ -360,12 +378,14 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck row = append(row, value) } } + adjustRowValuesBuf(sessVars, len(row)) key := t.RecordKey(recordID) - value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs) + sessVars.RowValBuf, err = tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, sessVars.RowValBuf, sessVars.AddRowValues) if err != nil { return 0, errors.Trace(err) } + value := sessVars.RowValBuf if err = txn.Set(key, value); err != nil { return 0, errors.Trace(err) } @@ -381,8 +401,8 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck return 0, errors.Trace(err) } } - ctx.GetSessionVars().StmtCtx.AddAffectedRows(1) - ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(t.ID, 1, 1) + sessVars.StmtCtx.AddAffectedRows(1) + sessVars.TxnCtx.UpdateDeltaForTable(t.ID, 1, 1) return recordID, nil } @@ -526,7 +546,7 @@ func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum, if err != nil { return errors.Trace(err) } - value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs) + value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil) if err != nil { return errors.Trace(err) } @@ -537,11 +557,11 @@ func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum, } func (t *Table) addUpdateBinlog(ctx context.Context, oldRow, newRow []types.Datum, colIDs []int64) error { - old, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, oldRow, colIDs) + old, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, oldRow, colIDs, nil, nil) if err != nil { return errors.Trace(err) } - newVal, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, newRow, colIDs) + newVal, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, newRow, colIDs, nil, nil) if err != nil { return errors.Trace(err) } @@ -553,7 +573,7 @@ func (t *Table) addUpdateBinlog(ctx context.Context, oldRow, newRow []types.Datu } func (t *Table) addDeleteBinlog(ctx context.Context, r []types.Datum, colIDs []int64) error { - data, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, r, colIDs) + data, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, r, colIDs, nil, nil) if err != nil { return errors.Trace(err) } diff --git a/tablecodec/tablecodec.go b/tablecodec/tablecodec.go index ccc3bfd9d9..467d74a8a1 100644 --- a/tablecodec/tablecodec.go +++ b/tablecodec/tablecodec.go @@ -159,7 +159,8 @@ func DecodeRowKey(key kv.Key) (int64, error) { // EncodeValue encodes a go value to bytes. func EncodeValue(sc *stmtctx.StatementContext, raw types.Datum) ([]byte, error) { - v, err := flatten(raw, sc.TimeZone) + var v types.Datum + err := flatten(raw, sc.TimeZone, &v) if err != nil { return nil, errors.Trace(err) } @@ -169,29 +170,32 @@ func EncodeValue(sc *stmtctx.StatementContext, raw types.Datum) ([]byte, error) // EncodeRow encode row data and column ids into a slice of byte. // Row layout: colID1, value1, colID2, value2, ..... -func EncodeRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int64) ([]byte, error) { +// valBuf and values pass by caller, for reducing EncodeRow allocates tempory bufs. If you pass valBuf and values as nil, +// EncodeRow will allocate it. +func EncodeRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int64, valBuf []byte, values []types.Datum) ([]byte, error) { if len(row) != len(colIDs) { return nil, errors.Errorf("EncodeRow error: data and columnID count not match %d vs %d", len(row), len(colIDs)) } - values := make([]types.Datum, 2*len(row)) + valBuf = valBuf[:0] + if values == nil { + values = make([]types.Datum, len(row)*2) + } for i, c := range row { id := colIDs[i] - idv := types.NewIntDatum(id) - values[2*i] = idv - fc, err := flatten(c, sc.TimeZone) + values[2*i].SetInt64(id) + err := flatten(c, sc.TimeZone, &values[2*i+1]) if err != nil { return nil, errors.Trace(err) } - values[2*i+1] = fc } if len(values) == 0 { // We could not set nil value into kv. return []byte{codec.NilFlag}, nil } - return codec.EncodeValue(sc, nil, values...) + return codec.EncodeValue(sc, valBuf, values...) } -func flatten(data types.Datum, loc *time.Location) (types.Datum, error) { +func flatten(data types.Datum, loc *time.Location, ret *types.Datum) error { switch data.Kind() { case types.KindMysqlTime: // for mysql datetime, timestamp and date type @@ -199,31 +203,33 @@ func flatten(data types.Datum, loc *time.Location) (types.Datum, error) { if t.Type == mysql.TypeTimestamp && loc != time.UTC { err := t.ConvertTimeZone(loc, time.UTC) if err != nil { - return data, errors.Trace(err) + return errors.Trace(err) } } v, err := t.ToPackedUint() - return types.NewUintDatum(v), errors.Trace(err) + ret.SetUint64(v) + return errors.Trace(err) case types.KindMysqlDuration: // for mysql time type - data.SetInt64(int64(data.GetMysqlDuration().Duration)) - return data, nil + ret.SetInt64(int64(data.GetMysqlDuration().Duration)) + return nil case types.KindMysqlEnum: - data.SetUint64(data.GetMysqlEnum().Value) - return data, nil + ret.SetUint64(data.GetMysqlEnum().Value) + return nil case types.KindMysqlSet: - data.SetUint64(data.GetMysqlSet().Value) - return data, nil + ret.SetUint64(data.GetMysqlSet().Value) + return nil case types.KindBinaryLiteral, types.KindMysqlBit: // We don't need to handle errors here since the literal is ensured to be able to store in uint64 in convertToMysqlBit. val, err := data.GetBinaryLiteral().ToInt() if err != nil { - return data, errors.Trace(err) + return errors.Trace(err) } - data.SetUint64(val) - return data, nil + ret.SetUint64(val) + return nil default: - return data, nil + *ret = data + return nil } } diff --git a/tablecodec/tablecodec_test.go b/tablecodec/tablecodec_test.go index a6a855e023..93fed47df5 100644 --- a/tablecodec/tablecodec_test.go +++ b/tablecodec/tablecodec_test.go @@ -73,7 +73,7 @@ func (s *testTableCodecSuite) TestRowCodec(c *C) { colIDs = append(colIDs, col.id) } sc := &stmtctx.StatementContext{TimeZone: time.Local} - bs, err := EncodeRow(sc, row, colIDs) + bs, err := EncodeRow(sc, row, colIDs, nil, nil) c.Assert(err, IsNil) c.Assert(bs, NotNil) @@ -128,7 +128,7 @@ func (s *testTableCodecSuite) TestRowCodec(c *C) { } // Make sure empty row return not nil value. - bs, err = EncodeRow(sc, []types.Datum{}, []int64{}) + bs, err = EncodeRow(sc, []types.Datum{}, []int64{}, nil, nil) c.Assert(err, IsNil) c.Assert(bs, HasLen, 1) @@ -163,7 +163,7 @@ func (s *testTableCodecSuite) TestTimeCodec(c *C) { colIDs = append(colIDs, col.id) } sc := &stmtctx.StatementContext{TimeZone: time.Local} - bs, err := EncodeRow(sc, row, colIDs) + bs, err := EncodeRow(sc, row, colIDs, nil, nil) c.Assert(err, IsNil) c.Assert(bs, NotNil) @@ -213,7 +213,7 @@ func (s *testTableCodecSuite) TestCutRow(c *C) { for _, col := range cols { colIDs = append(colIDs, col.id) } - bs, err := EncodeRow(sc, row, colIDs) + bs, err := EncodeRow(sc, row, colIDs, nil, nil) c.Assert(err, IsNil) c.Assert(bs, NotNil) diff --git a/types/datum.go b/types/datum.go index 37b906c3b5..d903341fd0 100644 --- a/types/datum.go +++ b/types/datum.go @@ -20,6 +20,7 @@ import ( "strconv" "strings" "time" + "unicode/utf8" "github.com/juju/errors" "github.com/pingcap/tidb/mysql" @@ -798,7 +799,7 @@ func ProduceStrWithSpecifiedTp(s string, tp *FieldType, sc *stmtctx.StatementCon // Flen is the rune length, not binary length, for UTF8 charset, we need to calculate the // rune count and truncate to Flen runes if it is too long. if chs == charset.CharsetUTF8 || chs == charset.CharsetUTF8MB4 { - characterLen := len([]rune(s)) + characterLen := utf8.RuneCountInString(s) if characterLen > flen { // 1. If len(s) is 0 and flen is 0, truncateLen will be 0, don't truncate s. // CREATE TABLE t (a char(0)); diff --git a/util/admin/admin_test.go b/util/admin/admin_test.go index 8b50ecf36a..55aec5cf98 100644 --- a/util/admin/admin_test.go +++ b/util/admin/admin_test.go @@ -501,7 +501,7 @@ func setColValue(c *C, txn kv.Transaction, key kv.Key, v types.Datum) { row := []types.Datum{v, {}} colIDs := []int64{2, 3} sc := &stmtctx.StatementContext{TimeZone: time.Local} - value, err := tablecodec.EncodeRow(sc, row, colIDs) + value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil) c.Assert(err, IsNil) err = txn.Set(key, value) c.Assert(err, IsNil) diff --git a/util/kvencoder/kv_encoder.go b/util/kvencoder/kv_encoder.go index 37c4d1d4ec..65d77ea65e 100644 --- a/util/kvencoder/kv_encoder.go +++ b/util/kvencoder/kv_encoder.go @@ -226,6 +226,7 @@ func (e *kvEncoder) initial(dbName string, idAlloc autoid.Allocator) (err error) se.GetSessionVars().IDAllocator = idAlloc se.GetSessionVars().ImportingData = true + se.GetSessionVars().SkipUTF8Check = true e.se = se e.store = store e.dom = dom diff --git a/util/mock/context.go b/util/mock/context.go index 34b95804c1..30d839c760 100644 --- a/util/mock/context.go +++ b/util/mock/context.go @@ -111,6 +111,7 @@ func (c *Context) NewTxn() error { return errors.Trace(err) } } + txn, err := c.Store.Begin() if err != nil { return errors.Trace(err) @@ -145,10 +146,15 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error { return nil } if c.Store != nil { + membufCap := kv.DefaultTxnMembufCap + if c.sessionVars.ImportingData { + membufCap = kv.ImportingTxnMembufCap + } txn, err := c.Store.BeginWithStartTS(startTS) if err != nil { return errors.Trace(err) } + txn.SetCap(membufCap) c.txn = txn } return nil