ddl: add create/drop index test.

This commit is contained in:
siddontang
2015-10-30 20:49:04 +08:00
parent dd20b230eb
commit 6399bdd46c
3 changed files with 176 additions and 11 deletions

View File

@ -14,6 +14,7 @@
package ddl
import (
"bytes"
"io"
"strings"
@ -345,7 +346,7 @@ func fetchSnapRowColVals(snap kv.MvccSnapshot, ver kv.Version, t table.Table, ha
col := cols[v.Offset]
k := t.RecordKey(handle, col)
data, err := snap.MvccGet([]byte(k), ver)
data, err := snap.MvccGet(kv.EncodeKey([]byte(k)), ver)
if err != nil {
return nil, errors.Trace(err)
}
@ -415,7 +416,7 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
defer snap.MvccRelease()
firstKey := t.FirstKey()
prefix := t.KeyPrefix()
prefix := []byte(t.KeyPrefix())
ctx := d.newReorgContext()
txn, err := ctx.GetTxn(true)
@ -424,14 +425,19 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
}
defer txn.Rollback()
it := snap.NewMvccIterator([]byte(firstKey), ver)
it := snap.NewMvccIterator(kv.EncodeKey([]byte(firstKey)), ver)
defer it.Close()
kvX := kv.NewKVIndex(t.IndexPrefix(), indexInfo.Name.L, indexInfo.Unique)
for it.Valid() && strings.HasPrefix(it.Key(), prefix) {
for it.Valid() {
key := kv.DecodeKey([]byte(it.Key()))
if !bytes.HasPrefix(key, prefix) {
break
}
var handle int64
handle, err = util.DecodeHandleFromRowKey(it.Key())
handle, err = util.DecodeHandleFromRowKey(string(key))
if err != nil {
return errors.Trace(err)
}
@ -474,9 +480,11 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, version u
}
}
rk := []byte(t.RecordKey(handle, nil))
rk := kv.EncodeKey([]byte(t.RecordKey(handle, nil)))
it, err = kv.NextUntil(it, util.RowKeyPrefixFilter(rk))
if err != nil {
if errors2.ErrorEqual(err, kv.ErrNotExist) {
break
} else if err != nil {
return errors.Trace(err)
}
}

148
ddl/index_test.go Normal file
View File

@ -0,0 +1,148 @@
// Copyright 2015 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// See the License for the specific language governing permissions and
// limitations under the License.
package ddl
import (
"io"
"time"
"github.com/ngaut/log"
. "github.com/pingcap/check"
"github.com/pingcap/tidb/column"
"github.com/pingcap/tidb/context"
"github.com/pingcap/tidb/kv"
"github.com/pingcap/tidb/model"
"github.com/pingcap/tidb/parser/coldef"
"github.com/pingcap/tidb/util/mock"
)
var _ = Suite(&testIndexSuite{})
type testIndexSuite struct {
store kv.Storage
dbInfo *model.DBInfo
d *ddl
}
func (s *testIndexSuite) SetUpSuite(c *C) {
s.store = testCreateStore(c, "test_index")
lease := 50 * time.Millisecond
s.d = newDDL(s.store, nil, nil, lease)
s.dbInfo = testSchemaInfo(c, s.d, "test")
testCreateSchema(c, mock.NewContext(), s.d, s.dbInfo)
}
func (s *testIndexSuite) TearDownSuite(c *C) {
testDropSchema(c, mock.NewContext(), s.d, s.dbInfo)
s.d.close()
s.store.Close()
}
func (s *testIndexSuite) testCreateIndex(c *C, ctx context.Context, tblInfo *model.TableInfo, unique bool, indexName string, colName string) *model.Job {
job := &model.Job{
SchemaID: s.dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionAddIndex,
Args: []interface{}{unique, model.NewCIStr(indexName), []*coldef.IndexColName{&coldef.IndexColName{ColumnName: colName, Length: 256}}},
}
err := s.d.startJob(ctx, job)
c.Assert(err, IsNil)
return job
}
func (s *testIndexSuite) testDropIndex(c *C, ctx context.Context, tblInfo *model.TableInfo, indexName string) *model.Job {
job := &model.Job{
SchemaID: s.dbInfo.ID,
TableID: tblInfo.ID,
Type: model.ActionDropIndex,
Args: []interface{}{model.NewCIStr(indexName)},
}
err := s.d.startJob(ctx, job)
c.Assert(err, IsNil)
return job
}
func (s *testIndexSuite) TestIndex(c *C) {
tblInfo := testTableInfo(c, s.d, "t1")
ctx := testNewContext(c, s.d)
defer ctx.FinishTxn(true)
txn, err := ctx.GetTxn(true)
c.Assert(err, IsNil)
testCreateTable(c, ctx, s.d, s.dbInfo, tblInfo)
t := testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
for i := 0; i < 10; i++ {
_, err := t.AddRecord(ctx, []interface{}{i, i, i})
c.Assert(err, IsNil)
}
err = ctx.FinishTxn(false)
c.Assert(err, IsNil)
i := int64(0)
t.IterRecords(ctx, t.FirstKey(), t.Cols(), func(h int64, data []interface{}, cols []*column.Col) (bool, error) {
c.Assert(data[0], Equals, i)
i++
return true, nil
})
job := s.testCreateIndex(c, ctx, tblInfo, true, "c1_uni", "c1")
testCheckJobDone(c, s.d, job, true)
t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
index := t.FindIndexByColName("c1")
c.Assert(index, NotNil)
h, err := t.AddRecord(ctx, []interface{}{11, 11, 11})
c.Assert(err, IsNil)
h1, err := t.AddRecord(ctx, []interface{}{11, 11, 11})
c.Assert(err, NotNil)
c.Assert(h, Equals, h1)
_, err = t.AddRecord(ctx, []interface{}{1, 1, 1})
c.Assert(err, NotNil)
it, _, err := index.X.Seek(txn, []interface{}{1})
c.Assert(err, IsNil)
c.Assert(it, NotNil)
_, h2, err := it.Next()
c.Assert(err, IsNil)
c.Assert(h, Equals, h2)
it.Close()
s.testDropIndex(c, ctx, tblInfo, "c1_uni")
t = testGetTable(c, s.d, s.dbInfo.ID, tblInfo.ID)
index1 := t.FindIndexByColName("c1")
c.Assert(index1, IsNil)
it, _, _ = index.X.Seek(txn, []interface{}{1})
c.Assert(it, NotNil)
_, _, err = it.Next()
c.Assert(err.Error(), Equals, io.EOF.Error())
}
func init() {
log.SetLevelByString("info")
}

View File

@ -111,9 +111,18 @@ func testCheckTableState(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.Tabl
})
}
func testGetTable(c *C, d *ddl, dbInfo *model.DBInfo, tblInfo *model.TableInfo) table.Table {
alloc := autoid.NewAllocator(d.store, dbInfo.ID)
tbl := table.TableFromMeta(dbInfo.Name.L, alloc, tblInfo)
func testGetTable(c *C, d *ddl, schemaID int64, tableID int64) table.Table {
var tblInfo *model.TableInfo
kv.RunInNewTxn(d.store, false, func(txn kv.Transaction) error {
t := meta.NewMeta(txn)
var err error
tblInfo, err = t.GetTable(schemaID, tableID)
c.Assert(err, IsNil)
c.Assert(tblInfo, NotNil)
return nil
})
alloc := autoid.NewAllocator(d.store, schemaID)
tbl := table.TableFromMeta("", alloc, tblInfo)
return tbl
}
@ -162,7 +171,7 @@ func (s *testTableSuite) TestTable(c *C) {
c.Assert(err, NotNil)
testCheckJobCancelled(c, d, job)
tbl := testGetTable(c, d, s.dbInfo, tblInfo)
tbl := testGetTable(c, d, s.dbInfo.ID, tblInfo.ID)
_, err = tbl.AddRecord(ctx, []interface{}{1, 1, 1})
c.Assert(err, IsNil)