*: importing data reduce memory allocations. (#5549)

This commit is contained in:
winkyao
2018-01-07 12:48:07 +08:00
committed by Ewan Chou
parent ecbd60c3ff
commit eddff3429a
23 changed files with 253 additions and 72 deletions

View File

@ -346,7 +346,7 @@ func (d *ddl) backfillColumnInTxn(t table.Table, colMeta *columnMeta, handles []
}
newColumnIDs = append(newColumnIDs, colMeta.colID)
newRow = append(newRow, colMeta.defaultVal)
newRowVal, err := tablecodec.EncodeRow(sc, newRow, newColumnIDs)
newRowVal, err := tablecodec.EncodeRow(sc, newRow, newColumnIDs, nil, nil)
if err != nil {
return 0, errors.Trace(err)
}

View File

@ -354,6 +354,7 @@ func (b *executorBuilder) buildInsert(v *plan.Insert) Executor {
}
ivs.Table = v.Table
ivs.initDefaultValBuf()
if v.IsReplace {
return b.buildReplace(ivs)
}
@ -380,6 +381,7 @@ func (b *executorBuilder) buildLoadData(v *plan.LoadData) Executor {
GenColumns: v.GenCols.Columns,
GenExprs: v.GenCols.Exprs,
}
insertVal.initDefaultValBuf()
tableCols := tbl.Cols()
columns, err := insertVal.getColumns(tableCols)
if err != nil {

View File

@ -466,9 +466,11 @@ func (e *DeleteExec) Open(goCtx goctx.Context) error {
// NewLoadDataInfo returns a LoadDataInfo structure, and it's only used for tests now.
func NewLoadDataInfo(row []types.Datum, ctx context.Context, tbl table.Table, cols []*table.Column) *LoadDataInfo {
insertVal := &InsertValues{baseExecutor: newBaseExecutor(nil, ctx), Table: tbl}
insertVal.initDefaultValBuf()
return &LoadDataInfo{
row: row,
insertVal: &InsertValues{baseExecutor: newBaseExecutor(nil, ctx), Table: tbl},
insertVal: insertVal,
Table: tbl,
Ctx: ctx,
columns: cols,
@ -806,6 +808,12 @@ func (e *LoadData) Open(goCtx goctx.Context) error {
return nil
}
type defaultVal struct {
val types.Datum
// We evaluate the default value lazily. The valid indicates whether the val is evaluated.
valid bool
}
// InsertValues is the data to insert.
type InsertValues struct {
baseExecutor
@ -825,6 +833,9 @@ type InsertValues struct {
GenColumns []*ast.ColumnName
GenExprs []expression.Expression
// colDefaultVals is used to store casted default value.
colDefaultVals []defaultVal
}
// InsertExec represents an insert executor.
@ -839,13 +850,20 @@ type InsertExec struct {
finished bool
}
func (e *InsertValues) initDefaultValBuf() {
e.colDefaultVals = make([]defaultVal, len(e.Table.Cols()))
}
func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error) {
// If tidb_batch_insert is ON and not in a transaction, we could use BatchInsert mode.
batchInsert := e.ctx.GetSessionVars().BatchInsert && !e.ctx.GetSessionVars().InTxn()
batchSize := e.ctx.GetSessionVars().DMLBatchSize
sessVars := e.ctx.GetSessionVars()
batchInsert := sessVars.BatchInsert && !sessVars.InTxn()
batchSize := sessVars.DMLBatchSize
txn := e.ctx.Txn()
rowCount := 0
sessVars.BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap)
defer sessVars.CleanBuffers()
for _, row := range rows {
if batchInsert && rowCount >= batchSize {
if err := e.ctx.NewTxn(); err != nil {
@ -854,6 +872,7 @@ func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error
}
txn = e.ctx.Txn()
rowCount = 0
sessVars.BufStore = kv.NewBufferStore(txn, kv.TempTxnMemBufCap)
}
if len(e.OnDuplicate) == 0 && !e.IgnoreErr {
txn.SetOption(kv.PresumeKeyNotExists, nil)
@ -861,7 +880,9 @@ func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error
h, err := e.Table.AddRecord(e.ctx, row, false)
txn.DelOption(kv.PresumeKeyNotExists)
if err == nil {
getDirtyDB(e.ctx).addRow(e.Table.Meta().ID, h, row)
if !sessVars.ImportingData {
getDirtyDB(e.ctx).addRow(e.Table.Meta().ID, h, row)
}
rowCount++
continue
}
@ -886,7 +907,7 @@ func (e *InsertExec) exec(goCtx goctx.Context, rows [][]types.Datum) (Row, error
}
if e.lastInsertID != 0 {
e.ctx.GetSessionVars().SetLastInsertID(e.lastInsertID)
sessVars.SetLastInsertID(e.lastInsertID)
}
e.finished = true
return nil, nil
@ -1234,6 +1255,21 @@ func (e *InsertValues) filterErr(err error, ignoreErr bool) error {
return nil
}
func (e *InsertValues) getColDefaultValue(idx int, col *table.Column) (types.Datum, error) {
if e.colDefaultVals[idx].valid {
return e.colDefaultVals[idx].val, nil
}
defaultVal, err := table.GetColDefaultValue(e.ctx, col.ToInfo())
if err != nil {
return types.Datum{}, errors.Trace(err)
}
e.colDefaultVals[idx].val = defaultVal
e.colDefaultVals[idx].valid = true
return defaultVal, nil
}
// initDefaultValues fills generated columns, auto_increment column and empty column.
// For NOT NULL column, it will return error or use zero value based on sql_mode.
func (e *InsertValues) initDefaultValues(row []types.Datum, hasValue []bool, ignoreErr bool) error {
@ -1258,7 +1294,7 @@ func (e *InsertValues) initDefaultValues(row []types.Datum, hasValue []bool, ign
}
if needDefaultValue {
var err error
row[i], err = table.GetColDefaultValue(e.ctx, c.ToInfo())
row[i], err = e.getColDefaultValue(i, c)
if e.filterErr(err, ignoreErr) != nil {
return errors.Trace(err)
}

View File

@ -17,6 +17,15 @@ import (
"github.com/juju/errors"
)
var (
// DefaultTxnMembufCap is the default transaction membuf capability.
DefaultTxnMembufCap = 4 * 1024
// ImportingTxnMembufCap is the capability of tidb importing data situation.
ImportingTxnMembufCap = 32 * 1024
// TempTxnMemBufCap is the capability of temporary membuf.
TempTxnMemBufCap = 64
)
// BufferStore wraps a Retriever for read and a MemBuffer for buffered write.
// Common usage pattern:
// bs := NewBufferStore(r) // use BufferStore to wrap a Retriever
@ -30,13 +39,26 @@ type BufferStore struct {
}
// NewBufferStore creates a BufferStore using r for read.
func NewBufferStore(r Retriever) *BufferStore {
func NewBufferStore(r Retriever, cap int) *BufferStore {
if cap <= 0 {
cap = DefaultTxnMembufCap
}
return &BufferStore{
r: r,
MemBuffer: &lazyMemBuffer{},
MemBuffer: &lazyMemBuffer{cap: cap},
}
}
// Reset resets s.MemBuffer.
func (s *BufferStore) Reset() {
s.MemBuffer.Reset()
}
// SetCap sets the MemBuffer capability.
func (s *BufferStore) SetCap(cap int) {
s.MemBuffer.SetCap(cap)
}
// Get implements the Retriever interface.
func (s *BufferStore) Get(k Key) ([]byte, error) {
val, err := s.MemBuffer.Get(k)

View File

@ -25,7 +25,7 @@ type testBufferStoreSuite struct{}
var _ = Suite(testBufferStoreSuite{})
func (s testBufferStoreSuite) TestGetSet(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer()})
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
key := Key("key")
value, err := bs.Get(key)
c.Check(err, NotNil)
@ -39,7 +39,7 @@ func (s testBufferStoreSuite) TestGetSet(c *C) {
}
func (s testBufferStoreSuite) TestSaveTo(c *C) {
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer()})
bs := NewBufferStore(&mockSnapshot{NewMemDbBuffer(DefaultTxnMembufCap)}, DefaultTxnMembufCap)
var buf bytes.Buffer
for i := 0; i < 10; i++ {
fmt.Fprint(&buf, i)
@ -49,7 +49,7 @@ func (s testBufferStoreSuite) TestSaveTo(c *C) {
}
bs.Set(Key("novalue"), nil)
mutator := NewMemDbBuffer()
mutator := NewMemDbBuffer(DefaultTxnMembufCap)
err := bs.SaveTo(mutator)
c.Check(err, IsNil)

View File

@ -109,6 +109,11 @@ type MemBuffer interface {
Size() int
// Len returns the number of entries in the DB.
Len() int
// Reset cleanup the MemBuffer
Reset()
// SetCap sets the MemBuffer capability, to reduce memory allocations.
// Please call it before you use the MemBuffer, otherwise it will not works.
SetCap(cap int)
}
// Transaction defines the interface for operations inside a Transaction.

View File

@ -43,11 +43,11 @@ type testKVSuite struct {
func (s *testKVSuite) SetUpSuite(c *C) {
s.bs = make([]MemBuffer, 1)
s.bs[0] = NewMemDbBuffer()
s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap)
}
func (s *testKVSuite) ResetMembuffers() {
s.bs[0] = NewMemDbBuffer()
s.bs[0] = NewMemDbBuffer(DefaultTxnMembufCap)
}
func insertData(c *C, buffer MemBuffer) {
@ -152,7 +152,7 @@ func (s *testKVSuite) TestNewIterator(c *C) {
func (s *testKVSuite) TestIterNextUntil(c *C) {
defer testleak.AfterTest(c)()
buffer := NewMemDbBuffer()
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
insertData(c, buffer)
iter, err := buffer.Seek(nil)
@ -209,7 +209,7 @@ func (s *testKVSuite) TestNewIteratorMin(c *C) {
}
func (s *testKVSuite) TestBufferLimit(c *C) {
buffer := NewMemDbBuffer().(*memDbBuffer)
buffer := NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer)
buffer.bufferSizeLimit = 1000
buffer.entrySizeLimit = 500
var err error
@ -222,7 +222,7 @@ func (s *testKVSuite) TestBufferLimit(c *C) {
err = buffer.Set([]byte("yz"), make([]byte, 499))
c.Assert(err, NotNil) // buffer size limit
buffer = NewMemDbBuffer().(*memDbBuffer)
buffer = NewMemDbBuffer(DefaultTxnMembufCap).(*memDbBuffer)
buffer.bufferLenLimit = 10
for i := 0; i < 10; i++ {
err = buffer.Set([]byte{byte(i)}, []byte{byte(i)})
@ -239,7 +239,7 @@ func BenchmarkMemDbBufferSequential(b *testing.B) {
for i := 0; i < opCnt; i++ {
data[i] = encodeInt(i)
}
buffer := NewMemDbBuffer()
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
benchmarkSetGet(b, buffer, data)
b.ReportAllocs()
}
@ -250,20 +250,20 @@ func BenchmarkMemDbBufferRandom(b *testing.B) {
data[i] = encodeInt(i)
}
shuffle(data)
buffer := NewMemDbBuffer()
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
benchmarkSetGet(b, buffer, data)
b.ReportAllocs()
}
func BenchmarkMemDbIter(b *testing.B) {
buffer := NewMemDbBuffer()
buffer := NewMemDbBuffer(DefaultTxnMembufCap)
benchIterator(b, buffer)
b.ReportAllocs()
}
func BenchmarkMemDbCreation(b *testing.B) {
for i := 0; i < b.N; i++ {
NewMemDbBuffer()
NewMemDbBuffer(DefaultTxnMembufCap)
}
b.ReportAllocs()
}

View File

@ -41,9 +41,9 @@ type memDbIter struct {
}
// NewMemDbBuffer creates a new memDbBuffer.
func NewMemDbBuffer() MemBuffer {
func NewMemDbBuffer(cap int) MemBuffer {
return &memDbBuffer{
db: memdb.New(comparer.DefaultComparer, 4*1024),
db: memdb.New(comparer.DefaultComparer, cap),
entrySizeLimit: TxnEntrySizeLimit,
bufferLenLimit: atomic.LoadUint64(&TxnEntryCountLimit),
bufferSizeLimit: TxnTotalSizeLimit,
@ -65,6 +65,10 @@ func (m *memDbBuffer) Seek(k Key) (Iterator, error) {
return i, nil
}
func (m *memDbBuffer) SetCap(cap int) {
}
func (m *memDbBuffer) SeekReverse(k Key) (Iterator, error) {
var i *memDbIter
if k == nil {
@ -120,6 +124,11 @@ func (m *memDbBuffer) Len() int {
return m.db.Len()
}
// Reset cleanup the MemBuffer.
func (m *memDbBuffer) Reset() {
m.db.Reset()
}
// Next implements the Iterator Next.
func (i *memDbIter) Next() error {
if i.reverse {

View File

@ -99,6 +99,14 @@ func (t *mockTxn) GetMemBuffer() MemBuffer {
return nil
}
func (t *mockTxn) SetCap(cap int) {
}
func (t *mockTxn) Reset() {
t.valid = false
}
// NewMockTxn new a mockTxn.
func NewMockTxn() Transaction {
return &mockTxn{
@ -126,7 +134,7 @@ func (s *mockStorage) BeginWithStartTS(startTS uint64) (Transaction, error) {
func (s *mockStorage) GetSnapshot(ver Version) (Snapshot, error) {
return &mockSnapshot{
store: NewMemDbBuffer(),
store: NewMemDbBuffer(DefaultTxnMembufCap),
}, nil
}

View File

@ -68,7 +68,7 @@ type unionStore struct {
// NewUnionStore builds a new UnionStore.
func NewUnionStore(snapshot Snapshot) UnionStore {
return &unionStore{
BufferStore: NewBufferStore(snapshot),
BufferStore: NewBufferStore(snapshot, DefaultTxnMembufCap),
snapshot: snapshot,
lazyConditionPairs: make(map[string]*conditionPair),
opts: make(map[Option]interface{}),
@ -99,7 +99,8 @@ func (it invalidIterator) Close() {}
// lazyMemBuffer wraps a MemBuffer which is to be initialized when it is modified.
type lazyMemBuffer struct {
mb MemBuffer
mb MemBuffer
cap int
}
func (lmb *lazyMemBuffer) Get(k Key) ([]byte, error) {
@ -112,7 +113,7 @@ func (lmb *lazyMemBuffer) Get(k Key) ([]byte, error) {
func (lmb *lazyMemBuffer) Set(key Key, value []byte) error {
if lmb.mb == nil {
lmb.mb = NewMemDbBuffer()
lmb.mb = NewMemDbBuffer(lmb.cap)
}
return lmb.mb.Set(key, value)
@ -120,7 +121,7 @@ func (lmb *lazyMemBuffer) Set(key Key, value []byte) error {
func (lmb *lazyMemBuffer) Delete(k Key) error {
if lmb.mb == nil {
lmb.mb = NewMemDbBuffer()
lmb.mb = NewMemDbBuffer(lmb.cap)
}
return lmb.mb.Delete(k)
@ -154,6 +155,16 @@ func (lmb *lazyMemBuffer) Len() int {
return lmb.mb.Len()
}
func (lmb *lazyMemBuffer) Reset() {
if lmb.mb != nil {
lmb.mb.Reset()
}
}
func (lmb *lazyMemBuffer) SetCap(cap int) {
lmb.cap = cap
}
// Get implements the Retriever interface.
func (us *unionStore) Get(k Key) ([]byte, error) {
v, err := us.MemBuffer.Get(k)
@ -233,10 +244,20 @@ func (us *unionStore) GetOption(opt Option) interface{} {
return us.opts[opt]
}
// GetMemBuffer return the MemBuffer binding to this UnionStore.
func (us *unionStore) GetMemBuffer() MemBuffer {
return us.BufferStore.MemBuffer
}
// SetCap sets membuffer capability.
func (us *unionStore) SetCap(cap int) {
us.BufferStore.SetCap(cap)
}
func (us *unionStore) Reset() {
us.BufferStore.Reset()
}
type options map[Option]interface{}
func (opts options) Get(opt Option) (interface{}, bool) {

View File

@ -27,7 +27,7 @@ type testUnionStoreSuite struct {
}
func (s *testUnionStoreSuite) SetUpTest(c *C) {
s.store = NewMemDbBuffer()
s.store = NewMemDbBuffer(DefaultTxnMembufCap)
s.us = NewUnionStore(&mockSnapshot{s.store})
}

View File

@ -23,7 +23,7 @@ type testUtilsSuite struct {
}
func (s testUtilsSuite) TestIncInt64(c *C) {
mb := NewMemDbBuffer()
mb := NewMemDbBuffer(DefaultTxnMembufCap)
key := Key("key")
v, err := IncInt64(mb, key, 1)
c.Check(err, IsNil)
@ -38,7 +38,7 @@ func (s testUtilsSuite) TestIncInt64(c *C) {
}
func (s testUtilsSuite) TestGetInt64(c *C) {
mb := NewMemDbBuffer()
mb := NewMemDbBuffer(DefaultTxnMembufCap)
key := Key("key")
v, err := GetInt64(mb, key)
c.Check(v, Equals, int64(0))

View File

@ -135,6 +135,14 @@ type session struct {
statsCollector *statistics.SessionStatsCollector
}
func (s *session) getMembufCap() int {
if s.sessionVars.ImportingData {
return kv.ImportingTxnMembufCap
}
return kv.DefaultTxnMembufCap
}
func (s *session) cleanRetryInfo() {
if !s.sessionVars.RetryInfo.Retrying {
retryInfo := s.sessionVars.RetryInfo
@ -882,6 +890,7 @@ func (s *session) NewTxn() error {
if err != nil {
return errors.Trace(err)
}
txn.SetCap(s.getMembufCap())
s.txn = txn
s.sessionVars.TxnCtx.StartTS = txn.StartTS()
return nil
@ -1276,10 +1285,12 @@ func (s *session) ActivePendingTxn() error {
}
future := s.txnFuture
s.txnFuture = nil
txn, err := future.wait()
if err != nil {
return errors.Trace(err)
}
txn.SetCap(s.getMembufCap())
s.txn = txn
s.sessionVars.TxnCtx.StartTS = s.txn.StartTS()
if s.sessionVars.Systems[variable.TxnIsolation] == ast.ReadCommitted {
@ -1296,6 +1307,7 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
if s.txnFuture == nil {
return errors.New("transaction channel is not set")
}
// no need to get txn from txnFutureCh since txn should init with startTs
s.txnFuture = nil
var err error
@ -1303,6 +1315,11 @@ func (s *session) InitTxnWithStartTS(startTS uint64) error {
if err != nil {
return errors.Trace(err)
}
s.txn.SetCap(s.getMembufCap())
err = s.loadCommonGlobalVariablesIfNeeded()
if err != nil {
return errors.Trace(err)
}
return nil
}

View File

@ -18,10 +18,12 @@ import (
"sync"
"time"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/meta/autoid"
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/terror"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util/auth"
)
@ -248,6 +250,14 @@ type SessionVars struct {
// EnableChunk indicates whether the chunk execution model is enabled.
// TODO: remove this after tidb-server configuration "enable-chunk' removed.
EnableChunk bool
// RowValBuf is used by tablecodec.EncodeRow, to reduce runtime.growslice.
RowValBuf []byte
// BufStore stores temp KVs for a row when executing insert statement.
// We could reuse a BufStore for multiple rows of a session to reduce memory allocations.
BufStore *kv.BufferStore
// AddRowValues use to store temp insert rows value, to reduce memory allocations when importing data.
AddRowValues []types.Datum
}
// NewSessionVars creates a session vars object.
@ -275,6 +285,13 @@ func NewSessionVars() *SessionVars {
}
}
// CleanBuffers cleans the temporary bufs
func (s *SessionVars) CleanBuffers() {
s.RowValBuf = nil
s.BufStore = nil
s.AddRowValues = nil
}
// GetCharsetInfo gets charset and collation for current context.
// What character set should the server translate a statement to after receiving it?
// For this, the server uses the character_set_connection and collation_connection system variables.

View File

@ -60,7 +60,7 @@ func (s *testClusterSuite) TestClusterSplit(c *C) {
rowKey := tablecodec.EncodeRowKeyWithHandle(tblID, handle)
colValue := types.NewStringDatum(strconv.Itoa(int(handle)))
// TODO: Should use session's TimeZone instead of UTC.
rowValue, err1 := tablecodec.EncodeRow(sc, []types.Datum{colValue}, []int64{colID})
rowValue, err1 := tablecodec.EncodeRow(sc, []types.Datum{colValue}, []int64{colID}, nil, nil)
c.Assert(err1, IsNil)
txn.Set(rowKey, rowValue)

View File

@ -65,6 +65,16 @@ func newTikvTxnWithStartTS(store *tikvStore, startTS uint64) (*tikvTxn, error) {
}, nil
}
// SetMemBufCap sets the transaction's MemBuffer capability, to reduce memory allocations.
func (txn *tikvTxn) SetCap(cap int) {
txn.us.SetCap(cap)
}
// Reset reset tikvTxn's membuf.
func (txn *tikvTxn) Reset() {
txn.us.Reset()
}
// Get implements transaction interface.
func (txn *tikvTxn) Get(k kv.Key) ([]byte, error) {
txnCmdCounter.WithLabelValues("get").Inc()

View File

@ -30,12 +30,13 @@ import (
"github.com/pingcap/tidb/mysql"
"github.com/pingcap/tidb/sessionctx/binloginfo"
"github.com/pingcap/tidb/sessionctx/stmtctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/table"
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/types"
"github.com/pingcap/tidb/util"
"github.com/pingcap/tidb/util/codec"
"github.com/pingcap/tipb/go-binlog"
binlog "github.com/pingcap/tipb/go-binlog"
log "github.com/sirupsen/logrus"
"github.com/spaolacci/murmur3"
)
@ -215,7 +216,8 @@ func (t *Table) FirstKey() kv.Key {
// Length of `oldData` and `newData` equals to length of `t.WritableCols()`.
func (t *Table) UpdateRecord(ctx context.Context, h int64, oldData, newData []types.Datum, touched []bool) error {
txn := ctx.Txn()
bs := kv.NewBufferStore(txn)
// TODO: reuse bs, like AddRecord does.
bs := kv.NewBufferStore(txn, kv.DefaultTxnMembufCap)
// rebuild index
err := t.rebuildIndices(ctx, bs, h, touched, oldData, newData)
@ -255,7 +257,7 @@ func (t *Table) UpdateRecord(ctx context.Context, h int64, oldData, newData []ty
}
key := t.RecordKey(h)
value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs)
value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil)
if err != nil {
return errors.Trace(err)
}
@ -308,6 +310,17 @@ func (t *Table) rebuildIndices(ctx context.Context, rm kv.RetrieverMutator, h in
return nil
}
// adjustRowValuesBuf adjust sessVars.AddRowValues length, sessVars.AddRowValues stores the inserting values that is used
// by tablecodec.EncodeRow, the encoded row format is `id1, colval, id2, colval`, so the correct length is rowLen * 2. If
// the inserting row has null value, AddRecord will skip it, so the rowLen will be different, so we need to adjust it.
func adjustRowValuesBuf(sessVars *variable.SessionVars, rowLen int) {
adjustLen := rowLen * 2
if sessVars.AddRowValues == nil || cap(sessVars.AddRowValues) < adjustLen {
sessVars.AddRowValues = make([]types.Datum, adjustLen)
}
sessVars.AddRowValues = sessVars.AddRowValues[:adjustLen]
}
// AddRecord implements table.Table AddRecord interface.
func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck bool) (recordID int64, err error) {
var hasRecordID bool
@ -326,9 +339,14 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck
}
txn := ctx.Txn()
bs := kv.NewBufferStore(txn)
skipCheck := ctx.GetSessionVars().ImportingData
sessVars := ctx.GetSessionVars()
// when ImportingData is true, no needs to check the key constrains, so we names the variable skipCheck.
skipCheck := sessVars.ImportingData
bs := sessVars.BufStore
if bs == nil {
bs = kv.NewBufferStore(ctx.Txn(), kv.DefaultTxnMembufCap)
}
bs.Reset()
if skipCheck {
txn.SetOption(kv.SkipCheckForWrite, true)
}
@ -360,12 +378,14 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck
row = append(row, value)
}
}
adjustRowValuesBuf(sessVars, len(row))
key := t.RecordKey(recordID)
value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs)
sessVars.RowValBuf, err = tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, sessVars.RowValBuf, sessVars.AddRowValues)
if err != nil {
return 0, errors.Trace(err)
}
value := sessVars.RowValBuf
if err = txn.Set(key, value); err != nil {
return 0, errors.Trace(err)
}
@ -381,8 +401,8 @@ func (t *Table) AddRecord(ctx context.Context, r []types.Datum, skipHandleCheck
return 0, errors.Trace(err)
}
}
ctx.GetSessionVars().StmtCtx.AddAffectedRows(1)
ctx.GetSessionVars().TxnCtx.UpdateDeltaForTable(t.ID, 1, 1)
sessVars.StmtCtx.AddAffectedRows(1)
sessVars.TxnCtx.UpdateDeltaForTable(t.ID, 1, 1)
return recordID, nil
}
@ -526,7 +546,7 @@ func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum,
if err != nil {
return errors.Trace(err)
}
value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs)
value, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, row, colIDs, nil, nil)
if err != nil {
return errors.Trace(err)
}
@ -537,11 +557,11 @@ func (t *Table) addInsertBinlog(ctx context.Context, h int64, row []types.Datum,
}
func (t *Table) addUpdateBinlog(ctx context.Context, oldRow, newRow []types.Datum, colIDs []int64) error {
old, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, oldRow, colIDs)
old, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, oldRow, colIDs, nil, nil)
if err != nil {
return errors.Trace(err)
}
newVal, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, newRow, colIDs)
newVal, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, newRow, colIDs, nil, nil)
if err != nil {
return errors.Trace(err)
}
@ -553,7 +573,7 @@ func (t *Table) addUpdateBinlog(ctx context.Context, oldRow, newRow []types.Datu
}
func (t *Table) addDeleteBinlog(ctx context.Context, r []types.Datum, colIDs []int64) error {
data, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, r, colIDs)
data, err := tablecodec.EncodeRow(ctx.GetSessionVars().StmtCtx, r, colIDs, nil, nil)
if err != nil {
return errors.Trace(err)
}

View File

@ -159,7 +159,8 @@ func DecodeRowKey(key kv.Key) (int64, error) {
// EncodeValue encodes a go value to bytes.
func EncodeValue(sc *stmtctx.StatementContext, raw types.Datum) ([]byte, error) {
v, err := flatten(raw, sc.TimeZone)
var v types.Datum
err := flatten(raw, sc.TimeZone, &v)
if err != nil {
return nil, errors.Trace(err)
}
@ -169,29 +170,32 @@ func EncodeValue(sc *stmtctx.StatementContext, raw types.Datum) ([]byte, error)
// EncodeRow encode row data and column ids into a slice of byte.
// Row layout: colID1, value1, colID2, value2, .....
func EncodeRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int64) ([]byte, error) {
// valBuf and values pass by caller, for reducing EncodeRow allocates tempory bufs. If you pass valBuf and values as nil,
// EncodeRow will allocate it.
func EncodeRow(sc *stmtctx.StatementContext, row []types.Datum, colIDs []int64, valBuf []byte, values []types.Datum) ([]byte, error) {
if len(row) != len(colIDs) {
return nil, errors.Errorf("EncodeRow error: data and columnID count not match %d vs %d", len(row), len(colIDs))
}
values := make([]types.Datum, 2*len(row))
valBuf = valBuf[:0]
if values == nil {
values = make([]types.Datum, len(row)*2)
}
for i, c := range row {
id := colIDs[i]
idv := types.NewIntDatum(id)
values[2*i] = idv
fc, err := flatten(c, sc.TimeZone)
values[2*i].SetInt64(id)
err := flatten(c, sc.TimeZone, &values[2*i+1])
if err != nil {
return nil, errors.Trace(err)
}
values[2*i+1] = fc
}
if len(values) == 0 {
// We could not set nil value into kv.
return []byte{codec.NilFlag}, nil
}
return codec.EncodeValue(sc, nil, values...)
return codec.EncodeValue(sc, valBuf, values...)
}
func flatten(data types.Datum, loc *time.Location) (types.Datum, error) {
func flatten(data types.Datum, loc *time.Location, ret *types.Datum) error {
switch data.Kind() {
case types.KindMysqlTime:
// for mysql datetime, timestamp and date type
@ -199,31 +203,33 @@ func flatten(data types.Datum, loc *time.Location) (types.Datum, error) {
if t.Type == mysql.TypeTimestamp && loc != time.UTC {
err := t.ConvertTimeZone(loc, time.UTC)
if err != nil {
return data, errors.Trace(err)
return errors.Trace(err)
}
}
v, err := t.ToPackedUint()
return types.NewUintDatum(v), errors.Trace(err)
ret.SetUint64(v)
return errors.Trace(err)
case types.KindMysqlDuration:
// for mysql time type
data.SetInt64(int64(data.GetMysqlDuration().Duration))
return data, nil
ret.SetInt64(int64(data.GetMysqlDuration().Duration))
return nil
case types.KindMysqlEnum:
data.SetUint64(data.GetMysqlEnum().Value)
return data, nil
ret.SetUint64(data.GetMysqlEnum().Value)
return nil
case types.KindMysqlSet:
data.SetUint64(data.GetMysqlSet().Value)
return data, nil
ret.SetUint64(data.GetMysqlSet().Value)
return nil
case types.KindBinaryLiteral, types.KindMysqlBit:
// We don't need to handle errors here since the literal is ensured to be able to store in uint64 in convertToMysqlBit.
val, err := data.GetBinaryLiteral().ToInt()
if err != nil {
return data, errors.Trace(err)
return errors.Trace(err)
}
data.SetUint64(val)
return data, nil
ret.SetUint64(val)
return nil
default:
return data, nil
*ret = data
return nil
}
}

View File

@ -73,7 +73,7 @@ func (s *testTableCodecSuite) TestRowCodec(c *C) {
colIDs = append(colIDs, col.id)
}
sc := &stmtctx.StatementContext{TimeZone: time.Local}
bs, err := EncodeRow(sc, row, colIDs)
bs, err := EncodeRow(sc, row, colIDs, nil, nil)
c.Assert(err, IsNil)
c.Assert(bs, NotNil)
@ -128,7 +128,7 @@ func (s *testTableCodecSuite) TestRowCodec(c *C) {
}
// Make sure empty row return not nil value.
bs, err = EncodeRow(sc, []types.Datum{}, []int64{})
bs, err = EncodeRow(sc, []types.Datum{}, []int64{}, nil, nil)
c.Assert(err, IsNil)
c.Assert(bs, HasLen, 1)
@ -163,7 +163,7 @@ func (s *testTableCodecSuite) TestTimeCodec(c *C) {
colIDs = append(colIDs, col.id)
}
sc := &stmtctx.StatementContext{TimeZone: time.Local}
bs, err := EncodeRow(sc, row, colIDs)
bs, err := EncodeRow(sc, row, colIDs, nil, nil)
c.Assert(err, IsNil)
c.Assert(bs, NotNil)
@ -213,7 +213,7 @@ func (s *testTableCodecSuite) TestCutRow(c *C) {
for _, col := range cols {
colIDs = append(colIDs, col.id)
}
bs, err := EncodeRow(sc, row, colIDs)
bs, err := EncodeRow(sc, row, colIDs, nil, nil)
c.Assert(err, IsNil)
c.Assert(bs, NotNil)

View File

@ -20,6 +20,7 @@ import (
"strconv"
"strings"
"time"
"unicode/utf8"
"github.com/juju/errors"
"github.com/pingcap/tidb/mysql"
@ -798,7 +799,7 @@ func ProduceStrWithSpecifiedTp(s string, tp *FieldType, sc *stmtctx.StatementCon
// Flen is the rune length, not binary length, for UTF8 charset, we need to calculate the
// rune count and truncate to Flen runes if it is too long.
if chs == charset.CharsetUTF8 || chs == charset.CharsetUTF8MB4 {
characterLen := len([]rune(s))
characterLen := utf8.RuneCountInString(s)
if characterLen > flen {
// 1. If len(s) is 0 and flen is 0, truncateLen will be 0, don't truncate s.
// CREATE TABLE t (a char(0));

View File

@ -501,7 +501,7 @@ func setColValue(c *C, txn kv.Transaction, key kv.Key, v types.Datum) {
row := []types.Datum{v, {}}
colIDs := []int64{2, 3}
sc := &stmtctx.StatementContext{TimeZone: time.Local}
value, err := tablecodec.EncodeRow(sc, row, colIDs)
value, err := tablecodec.EncodeRow(sc, row, colIDs, nil, nil)
c.Assert(err, IsNil)
err = txn.Set(key, value)
c.Assert(err, IsNil)

View File

@ -226,6 +226,7 @@ func (e *kvEncoder) initial(dbName string, idAlloc autoid.Allocator) (err error)
se.GetSessionVars().IDAllocator = idAlloc
se.GetSessionVars().ImportingData = true
se.GetSessionVars().SkipUTF8Check = true
e.se = se
e.store = store
e.dom = dom

View File

@ -111,6 +111,7 @@ func (c *Context) NewTxn() error {
return errors.Trace(err)
}
}
txn, err := c.Store.Begin()
if err != nil {
return errors.Trace(err)
@ -145,10 +146,15 @@ func (c *Context) InitTxnWithStartTS(startTS uint64) error {
return nil
}
if c.Store != nil {
membufCap := kv.DefaultTxnMembufCap
if c.sessionVars.ImportingData {
membufCap = kv.ImportingTxnMembufCap
}
txn, err := c.Store.BeginWithStartTS(startTS)
if err != nil {
return errors.Trace(err)
}
txn.SetCap(membufCap)
c.txn = txn
}
return nil