diff --git a/ddl/ddl.go b/ddl/ddl.go index 33f5f42836..465ffa004f 100644 --- a/ddl/ddl.go +++ b/ddl/ddl.go @@ -357,13 +357,12 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) { terror.Log(errors.Trace(err)) d.workers = make(map[workerType]*worker, 2) - d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool) - d.workers[addIdxWorker] = newWorker(addIdxWorker, 1, d.store, ctxPool) + d.workers[generalWorker] = newWorker(generalWorker, d.store, ctxPool) + d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, ctxPool) for _, worker := range d.workers { worker.wg.Add(1) go worker.start(d.ddlCtx) - // TODO: Add the type of DDL worker. - metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc() + metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDLWorker, worker.String())).Inc() // When the start function is called, we will send a fake job to let worker // checks owner firstly and try to find whether a job exists and run. @@ -419,13 +418,6 @@ func (d *ddl) genGlobalID() (int64, error) { return globalID, errors.Trace(err) } -// generalWorker returns the general worker. -// It's used for testing. -// TODO: Remove this function. -func (d *ddl) generalWorker() *worker { - return d.workers[generalWorker] -} - // SchemaSyncer implements DDL.SchemaSyncer interface. func (d *ddl) SchemaSyncer() SchemaSyncer { return d.schemaSyncer diff --git a/ddl/ddl_test.go b/ddl/ddl_test.go index cc795ccae7..a366a758a9 100644 --- a/ddl/ddl_test.go +++ b/ddl/ddl_test.go @@ -56,6 +56,11 @@ func (d *ddl) SetInterceptoror(i Interceptor) { d.mu.interceptor = i } +// generalWorker returns the general worker. +func (d *ddl) generalWorker() *worker { + return d.workers[generalWorker] +} + func TestT(t *testing.T) { CustomVerboseFlag = true logLevel := os.Getenv("log_level") diff --git a/ddl/ddl_worker.go b/ddl/ddl_worker.go index 9791e0ad16..2ed9d6cb48 100644 --- a/ddl/ddl_worker.go +++ b/ddl/ddl_worker.go @@ -16,6 +16,7 @@ package ddl import ( "fmt" "sync" + "sync/atomic" "time" "github.com/juju/errors" @@ -32,8 +33,12 @@ import ( "golang.org/x/net/context" ) -// RunWorker indicates if this TiDB server starts DDL worker and can run DDL job. -var RunWorker = true +var ( + // RunWorker indicates if this TiDB server starts DDL worker and can run DDL job. + RunWorker = true + // ddlWorkerID is used for generating the next DDL worker ID. + ddlWorkerID = int32(0) +) type workerType byte @@ -51,7 +56,7 @@ const ( // worker is used for handling DDL jobs. // Now we have two kinds of workers. type worker struct { - id int + id int32 tp workerType ddlJobCh chan struct{} quitCh chan struct{} @@ -61,9 +66,9 @@ type worker struct { delRangeManager delRangeManager } -func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourcePool) *worker { +func newWorker(tp workerType, store kv.Storage, ctxPool *pools.ResourcePool) *worker { worker := &worker{ - id: id, + id: atomic.AddInt32(&ddlWorkerID, 1), tp: tp, ddlJobCh: make(chan struct{}, 1), quitCh: make(chan struct{}),