diff --git a/ddl/ddl_db_test.go b/ddl/ddl_db_test.go index 93baf61e51..fe21d2bb08 100644 --- a/ddl/ddl_db_test.go +++ b/ddl/ddl_db_test.go @@ -458,7 +458,7 @@ func (s *testDBSuite) testAlterLock(c *C) { func (s *testDBSuite) TestAddIndex(c *C) { s.tk = testkit.NewTestKit(c, s.store) s.tk.MustExec("use " + s.schemaName) - s.tk.MustExec("create table test_add_index (c1 int, c2 int, c3 int, primary key(c1))") + s.tk.MustExec("create table test_add_index (c1 bigint, c2 bigint, c3 bigint, primary key(c1))") done := make(chan error, 1) start := -10 @@ -483,6 +483,11 @@ func (s *testDBSuite) TestAddIndex(c *C) { otherKeys = append(otherKeys, n) } } + // Encounter the value of math.MaxInt64 in middle of + v := math.MaxInt64 - defaultBatchSize/2 + sql := fmt.Sprintf("insert into test_add_index values (%d, %d, %d)", v, v, v) + s.mustExec(c, sql) + otherKeys = append(otherKeys, v) sessionExecInGoroutine(c, s.store, "create index c3_index on test_add_index (c3)", done) diff --git a/ddl/index.go b/ddl/index.go index 271c7c0386..ac6c32ece6 100644 --- a/ddl/index.go +++ b/ddl/index.go @@ -474,7 +474,9 @@ func (w *worker) getIndexRecord(t table.Table, colMap map[int64]*types.FieldType } const ( + minTaskHandledCnt = 32 // minTaskHandledCnt is the minimum number of handles per batch. defaultTaskHandleCnt = 128 + maxTaskHandleCnt = 1 << 20 // maxTaskHandleCnt is the maximum number of handles per batch. defaultWorkers = 16 ) @@ -501,6 +503,7 @@ type worker struct { idxRecords []*indexRecord // It's used to reduce the number of new slice. taskRange handleInfo // Every task's handle range. taskRet *taskResult + batchSize int rowMap map[int64]types.Datum // It's the index column values map. It is used to reduce the number of making map. } @@ -508,6 +511,7 @@ func newWorker(ctx context.Context, id, batch, colsLen, indexColsLen int) *worke return &worker{ id: id, ctx: ctx, + batchSize: batch, idxRecords: make([]*indexRecord, 0, batch), defaultVals: make([]types.Datum, colsLen), rowMap: make(map[int64]types.Datum, indexColsLen), @@ -525,6 +529,13 @@ type handleInfo struct { endHandle int64 } +func getEndHandle(baseHandle, batch int64) int64 { + if baseHandle >= math.MaxInt64-batch { + return math.MaxInt64 + } + return baseHandle + batch +} + // addTableIndex adds index into table. // TODO: Move this to doc or wiki. // How to add index in reorganization state? @@ -551,25 +562,27 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo colMap[col.ID] = &col.FieldType } workerCnt := defaultWorkers - taskBatch := int64(defaultTaskHandleCnt) addedCount := job.GetRowCount() - baseHandle := reorgInfo.Handle + baseHandle, logStartHandle := reorgInfo.Handle, reorgInfo.Handle workers := make([]*worker, workerCnt) for i := 0; i < workerCnt; i++ { ctx := d.newContext() - workers[i] = newWorker(ctx, i, int(taskBatch), len(cols), len(colMap)) + workers[i] = newWorker(ctx, i, defaultTaskHandleCnt, len(cols), len(colMap)) // Make sure every worker has its own index buffer. workers[i].index = tables.NewIndexWithBuffer(t.Meta(), indexInfo) } for { startTime := time.Now() wg := sync.WaitGroup{} + currentBatchSize := int64(workers[0].batchSize) for i := 0; i < workerCnt; i++ { wg.Add(1) - workers[i].setTaskNewRange(baseHandle+int64(i)*taskBatch, baseHandle+int64(i+1)*taskBatch) + endHandle := getEndHandle(baseHandle, currentBatchSize) + workers[i].setTaskNewRange(baseHandle, endHandle) // TODO: Consider one worker to one goroutine. go workers[i].doBackfillIndexTask(t, colMap, &wg) + baseHandle = endHandle } wg.Wait() @@ -584,19 +597,19 @@ func (d *ddl) addTableIndex(t table.Table, indexInfo *model.IndexInfo, reorgInfo err1 := kv.RunInNewTxn(d.store, true, func(txn kv.Transaction) error { return errors.Trace(reorgInfo.UpdateHandle(txn, nextHandle)) }) - log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, take time %v, update handle err %v", - addedCount, baseHandle, nextHandle, taskAddedCount, err, sub, err1) + log.Warnf("[ddl] total added index for %d rows, this task [%d,%d) add index for %d failed %v, batch %d, take time %v, update handle err %v", + addedCount, logStartHandle, nextHandle, taskAddedCount, err, currentBatchSize, sub, err1) return errors.Trace(err) } d.reorgCtx.setRowCountAndHandle(addedCount, nextHandle) batchHandleDataHistogram.WithLabelValues(batchAddIdx).Observe(sub) - log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, take time %v", - addedCount, baseHandle, nextHandle, taskAddedCount, sub) + log.Infof("[ddl] total added index for %d rows, this task [%d,%d) added index for %d rows, batch %d, take time %v", + addedCount, logStartHandle, nextHandle, taskAddedCount, currentBatchSize, sub) if isEnd { return nil } - baseHandle = nextHandle + baseHandle, logStartHandle = nextHandle, nextHandle } } @@ -604,6 +617,8 @@ func getCountAndHandle(workers []*worker) (int64, int64, bool, error) { taskAddedCount, nextHandle := int64(0), workers[0].taskRange.startHandle var err error var isEnd bool + starvingWorkers := 0 + largerDefaultWorkers := 0 for _, worker := range workers { ret := worker.taskRet if ret.err != nil { @@ -611,9 +626,29 @@ func getCountAndHandle(workers []*worker) (int64, int64, bool, error) { break } taskAddedCount += int64(ret.count) + if ret.count < minTaskHandledCnt { + starvingWorkers++ + } else if ret.count > defaultTaskHandleCnt { + largerDefaultWorkers++ + } nextHandle = ret.outOfRangeHandle isEnd = ret.isAllDone } + + // Adjust the worker's batch size. + halfWorkers := len(workers) / 2 + if starvingWorkers >= halfWorkers && workers[0].batchSize < maxTaskHandleCnt { + // If the index data is discrete, we need to increase the batch size to speed up. + for _, worker := range workers { + worker.batchSize *= 2 + } + } else if largerDefaultWorkers >= halfWorkers && workers[0].batchSize > defaultTaskHandleCnt { + // If the batch size exceeds the limit after we increase it, + // we need to decrease the batch size to reduce write conflict. + for _, worker := range workers { + worker.batchSize /= 2 + } + } return taskAddedCount, nextHandle, isEnd, errors.Trace(err) } diff --git a/table/tables/index_test.go b/table/tables/index_test.go index 487182c643..1ebde5a8a1 100644 --- a/table/tables/index_test.go +++ b/table/tables/index_test.go @@ -34,6 +34,7 @@ type testIndexSuite struct { } func (s *testIndexSuite) SetUpSuite(c *C) { + testleak.BeforeTest() store, err := tikv.NewMockTikvStore() c.Assert(err, IsNil) s.s = store @@ -42,10 +43,10 @@ func (s *testIndexSuite) SetUpSuite(c *C) { func (s *testIndexSuite) TearDownSuite(c *C) { err := s.s.Close() c.Assert(err, IsNil) + testleak.AfterTest(c)() } func (s *testIndexSuite) TestIndex(c *C) { - defer testleak.AfterTest(c)() tblInfo := &model.TableInfo{ ID: 1, Indices: []*model.IndexInfo{ @@ -184,7 +185,6 @@ func (s *testIndexSuite) TestIndex(c *C) { } func (s *testIndexSuite) TestCombineIndexSeek(c *C) { - defer testleak.AfterTest(c)() tblInfo := &model.TableInfo{ ID: 1, Indices: []*model.IndexInfo{