// Copyright 2017 PingCAP, Inc. // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // // http://www.apache.org/licenses/LICENSE-2.0 // // Unless required by applicable law or agreed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // See the License for the specific language governing permissions and // limitations under the License. package kvenc import ( "bytes" "fmt" "sync/atomic" "github.com/juju/errors" "github.com/pingcap/tidb" "github.com/pingcap/tidb/domain" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/meta/autoid" "github.com/pingcap/tidb/mysql" "github.com/pingcap/tidb/plan" "github.com/pingcap/tidb/store/mockstore" "github.com/pingcap/tidb/tablecodec" log "github.com/sirupsen/logrus" "golang.org/x/net/context" ) var _ KvEncoder = &kvEncoder{} var mockConnID uint64 // KvPair is a key-value pair. type KvPair struct { // Key is the key of the pair. Key []byte // Val is the value of the pair. if the op is delete, the len(Val) == 0 Val []byte } // KvEncoder is an encoder that transfer sql to key-value pairs. type KvEncoder interface { // Encode transfers sql to kv pairs. // Before use Encode() method, please make sure you already created schame by calling ExecDDLSQL() method. // NOTE: now we just support transfers insert statement to kv pairs. // (if we wanna support other statement, we need to add a kv.Storage parameter, // and pass tikv store in.) // return encoded kvs array that generate by sql, and affectRows count. Encode(sql string, tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) // PrepareStmt prepare query statement, and return statement id. // Pass stmtID into EncodePrepareStmt to execute a prepare statement. PrepareStmt(query string) (stmtID uint32, err error) // EncodePrepareStmt transfer prepare query to kv pairs. // stmtID is generated by PrepareStmt. EncodePrepareStmt(tableID int64, stmtID uint32, param ...interface{}) (kvPairs []KvPair, affectedRows uint64, err error) // ExecDDLSQL executes ddl sql, you must use it to create schema infos. ExecDDLSQL(sql string) error // EncodeMetaAutoID encode the table meta info, autoID to coresponding key-value pair. EncodeMetaAutoID(dbID, tableID, autoID int64) (KvPair, error) // Close cleanup the kvEncoder. Close() error } type kvEncoder struct { store kv.Storage dom *domain.Domain se tidb.Session } // New new a KvEncoder func New(dbName string, idAlloc autoid.Allocator) (KvEncoder, error) { kvEnc := &kvEncoder{} err := kvEnc.initial(dbName, idAlloc) if err != nil { return nil, errors.Trace(err) } return kvEnc, nil } func (e *kvEncoder) Close() error { e.dom.Close() if err := e.store.Close(); err != nil { return errors.Trace(err) } return nil } func (e *kvEncoder) Encode(sql string, tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) { e.se.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) defer func() { err1 := e.se.RollbackTxn(context.Background()) if err1 != nil { log.Error(errors.ErrorStack(err1)) } }() _, err = e.se.Execute(context.Background(), sql) if err != nil { return nil, 0, errors.Trace(err) } return e.getKvPairsInMemBuffer(tableID) } func (e *kvEncoder) getKvPairsInMemBuffer(tableID int64) (kvPairs []KvPair, affectedRows uint64, err error) { txnMemBuffer := e.se.Txn().GetMemBuffer() kvPairs = make([]KvPair, 0, txnMemBuffer.Len()) err = kv.WalkMemBuffer(txnMemBuffer, func(k kv.Key, v []byte) error { if bytes.HasPrefix(k, tablecodec.TablePrefix()) { k = tablecodec.ReplaceRecordKeyTableID(k, tableID) } kvPairs = append(kvPairs, KvPair{Key: k, Val: v}) return nil }) if err != nil { return nil, 0, errors.Trace(err) } return kvPairs, e.se.GetSessionVars().StmtCtx.AffectedRows(), nil } func (e *kvEncoder) PrepareStmt(query string) (stmtID uint32, err error) { stmtID, _, _, err = e.se.PrepareStmt(query) return } func (e *kvEncoder) EncodePrepareStmt(tableID int64, stmtID uint32, param ...interface{}) (kvPairs []KvPair, affectedRows uint64, err error) { e.se.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, true) defer func() { err1 := e.se.RollbackTxn(context.Background()) if err1 != nil { log.Error(errors.ErrorStack(err1)) } }() _, err = e.se.ExecutePreparedStmt(context.Background(), stmtID, param...) if err != nil { return nil, 0, errors.Trace(err) } return e.getKvPairsInMemBuffer(tableID) } func (e *kvEncoder) EncodeMetaAutoID(dbID, tableID, autoID int64) (KvPair, error) { mockTxn := kv.NewMockTxn() m := meta.NewMeta(mockTxn) k, v := m.GenAutoTableIDIDKeyValue(dbID, tableID, autoID) return KvPair{Key: k, Val: v}, nil } func (e *kvEncoder) ExecDDLSQL(sql string) error { _, err := e.se.Execute(context.Background(), sql) if err != nil { return errors.Trace(err) } return nil } func newMockTikvWithBootstrap() (kv.Storage, *domain.Domain, error) { store, err := mockstore.NewMockTikvStore() if err != nil { return nil, nil, errors.Trace(err) } tidb.SetSchemaLease(0) dom, err := tidb.BootstrapSession(store) return store, dom, errors.Trace(err) } func (e *kvEncoder) initial(dbName string, idAlloc autoid.Allocator) (err error) { var ( store kv.Storage dom *domain.Domain se tidb.Session ) defer func() { if err == nil { return } if store != nil { if err1 := store.Close(); err1 != nil { log.Error(errors.ErrorStack(err1)) } } if dom != nil { dom.Close() } if se != nil { se.Close() } }() plan.PreparedPlanCacheEnabled = true plan.PreparedPlanCacheCapacity = 10 // disable stats update. tidb.SetStatsLease(0) store, dom, err = newMockTikvWithBootstrap() if err != nil { err = errors.Trace(err) return } se, err = tidb.CreateSession(store) if err != nil { err = errors.Trace(err) return } se.SetConnectionID(atomic.AddUint64(&mockConnID, 1)) _, err = se.Execute(context.Background(), fmt.Sprintf("create database if not exists %s", dbName)) if err != nil { err = errors.Trace(err) return } _, err = se.Execute(context.Background(), fmt.Sprintf("use %s", dbName)) if err != nil { err = errors.Trace(err) return } se.GetSessionVars().IDAllocator = idAlloc se.GetSessionVars().ImportingData = true se.GetSessionVars().SkipUTF8Check = true e.se = se e.store = store e.dom = dom return nil }