ddl: Deal with discrete handles (#5102)

* ddl: deal with discrete handles
This commit is contained in:
Lynn
2017-11-19 21:30:06 -06:00
committed by GitHub
parent 14ed94132c
commit a220cff85e
3 changed files with 52 additions and 12 deletions

View File

@ -458,7 +458,7 @@ func (s *testDBSuite) testAlterLock(c *C) {
func (s *testDBSuite) TestAddIndex(c *C) {
s.tk = testkit.NewTestKit(c, s.store)
s.tk.MustExec("use " + s.schemaName)
s.tk.MustExec("create table test_add_index (c1 int, c2 int, c3 int, primary key(c1))")
s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))")
done := make(chan error, 1)
start := -10
@ -483,6 +483,11 @@ func (s *testDBSuite) TestAddIndex(c *C) {
otherKeys = append(otherKeys, n)
}
}
// Encounter the value of math.MaxInt64 in middle of
v := math.MaxInt64 - defaultBatchSize/2
sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v)
s.mustExec(c, sql)
otherKeys = append(otherKeys, v)
sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done)

View File

@ -474,7 +474,9 @@ func (w *worker) getIndexRecord(t table.Table, colMap map[int64]*types.FieldType
}
const (
minTaskHandledCnt = 32 // minTaskHandledCnt is the minimum number of handles per batch.
defaultTaskHandleCnt = 128
maxTaskHandleCnt = 1 << 20 // maxTaskHandleCnt is the maximum number of handles per batch.
defaultWorkers = 16
)
@ -501,6 +503,7 @@ type worker struct {
idxRecords []*indexRecord // It's used to reduce the number of new slice.
taskRange handleInfo // Every task's handle range.
taskRet *taskResult
batchSize int
rowMap map[int64]types.Datum // It's the index column values map. It is used to reduce the number of making map.
}
@ -508,6 +511,7 @@ func newWorker(ctx context.Context, id, batch, colsLen, indexColsLen int) *worke
return &worker{
id: id,
ctx: ctx,
batchSize: batch,
idxRecords: make([]*indexRecord, 0, batch),
defaultVals: make([]types.Datum, colsLen),
rowMap: make(map[int64]types.Datum, indexColsLen),
@ -525,6 +529,13 @@ type handleInfo struct {
endHandle int64
}
func getEndHandle(baseHandle, batch int64) int64 {
if baseHandle >= math.MaxInt64-batch {
return math.MaxInt64
}
return baseHandle + batch
}
// addTableIndex adds index into table.
// TODO: Move this to doc or wiki.
// How to add index in reorganization state?
@ -551,25 +562,27 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
colMap[col.ID] = &col.FieldType
}
workerCnt := defaultWorkers
taskBatch := int64(defaultTaskHandleCnt)
addedCount := job.GetRowCount()
baseHandle := reorgInfo.Handle
baseHandle, logStartHandle := reorgInfo.Handle, reorgInfo.Handle
workers := make([]*worker, workerCnt)
for i := 0; i < workerCnt; i++ {
ctx := d.newContext()
workers[i] = newWorker(ctx, i, int(taskBatch), len(cols), len(colMap))
workers[i] = newWorker(ctx, i, defaultTaskHandleCnt, len(cols), len(colMap))
// Make sure every worker has its own index buffer.
workers[i].index = tables.NewIndexWithBuffer(t.Meta(), indexInfo)
}
for {
startTime := time.Now()
wg := sync.WaitGroup{}
currentBatchSize := int64(workers[0].batchSize)
for i := 0; i < workerCnt; i++ {
wg.Add(1)
workers[i].setTaskNewRange(baseHandle+int64(i)*taskBatch, baseHandle+int64(i+1)*taskBatch)
endHandle := getEndHandle(baseHandle, currentBatchSize)
workers[i].setTaskNewRange(baseHandle, endHandle)
// TODO: Consider one worker to one goroutine.
go workers[i].doBackfillIndexTask(t, colMap, &wg)
baseHandle = endHandle
}
wg.Wait()
@ -584,19 +597,19 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo
err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error {
return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle))
})
log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, take time %v, update handle err %v",
addedCount, baseHandle, nextHandle, taskAddedCount, err, sub, err1)
log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, batch %d, take time %v, update handle err %v",
addedCount, logStartHandle, nextHandle, taskAddedCount, err, currentBatchSize, sub, err1)
return errors.Trace(err)
}
d.reorgCtx.setRowCountAndHandle(addedCount, nextHandle)
batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub)
log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, take time %v",
addedCount, baseHandle, nextHandle, taskAddedCount, sub)
log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, batch %d, take time %v",
addedCount, logStartHandle, nextHandle, taskAddedCount, currentBatchSize, sub)
if isEnd {
return nil
}
baseHandle = nextHandle
baseHandle, logStartHandle = nextHandle, nextHandle
}
}
@ -604,6 +617,8 @@ func getCountAndHandle(workers []*worker) (int64, int64, bool, error) {
taskAddedCount, nextHandle := int64(0), workers[0].taskRange.startHandle
var err error
var isEnd bool
starvingWorkers := 0
largerDefaultWorkers := 0
for _, worker := range workers {
ret := worker.taskRet
if ret.err != nil {
@ -611,9 +626,29 @@ func getCountAndHandle(workers []*worker) (int64, int64, bool, error) {
break
}
taskAddedCount += int64(ret.count)
if ret.count < minTaskHandledCnt {
starvingWorkers++
} else if ret.count > defaultTaskHandleCnt {
largerDefaultWorkers++
}
nextHandle = ret.outOfRangeHandle
isEnd = ret.isAllDone
}
// Adjust the worker's batch size.
halfWorkers := len(workers) / 2
if starvingWorkers >= halfWorkers && workers[0].batchSize < maxTaskHandleCnt {
// If the index data is discrete, we need to increase the batch size to speed up.
for _, worker := range workers {
worker.batchSize *= 2
}
} else if largerDefaultWorkers >= halfWorkers && workers[0].batchSize > defaultTaskHandleCnt {
// If the batch size exceeds the limit after we increase it,
// we need to decrease the batch size to reduce write conflict.
for _, worker := range workers {
worker.batchSize /= 2
}
}
return taskAddedCount, nextHandle, isEnd, errors.Trace(err)
}

View File

@ -34,6 +34,7 @@ type testIndexSuite struct {
}
func (s *testIndexSuite) SetUpSuite(c *C) {
testleak.BeforeTest()
store, err := tikv.NewMockTikvStore()
c.Assert(err, IsNil)
s.s = store
@ -42,10 +43,10 @@ func (s *testIndexSuite) SetUpSuite(c *C) {
func (s *testIndexSuite) TearDownSuite(c *C) {
err := s.s.Close()
c.Assert(err, IsNil)
testleak.AfterTest(c)()
}
func (s *testIndexSuite) TestIndex(c *C) {
defer testleak.AfterTest(c)()
tblInfo := &model.TableInfo{
ID: 1,
Indices: []*model.IndexInfo{
@ -184,7 +185,6 @@ func (s *testIndexSuite) TestIndex(c *C) {
}
func (s *testIndexSuite) TestCombineIndexSeek(c *C) {
defer testleak.AfterTest(c)()
tblInfo := &model.TableInfo{
ID: 1,
Indices: []*model.IndexInfo{