ddl: add metrics and clean up (#7168)
This commit is contained in:
14
ddl/ddl.go
14
ddl/ddl.go
@ -357,13 +357,12 @@ func (d *ddl) start(ctx context.Context, ctxPool *pools.ResourcePool) {
|
||||
terror.Log(errors.Trace(err))
|
||||
|
||||
d.workers = make(map[workerType]*worker, 2)
|
||||
d.workers[generalWorker] = newWorker(generalWorker, 0, d.store, ctxPool)
|
||||
d.workers[addIdxWorker] = newWorker(addIdxWorker, 1, d.store, ctxPool)
|
||||
d.workers[generalWorker] = newWorker(generalWorker, d.store, ctxPool)
|
||||
d.workers[addIdxWorker] = newWorker(addIdxWorker, d.store, ctxPool)
|
||||
for _, worker := range d.workers {
|
||||
worker.wg.Add(1)
|
||||
go worker.start(d.ddlCtx)
|
||||
// TODO: Add the type of DDL worker.
|
||||
metrics.DDLCounter.WithLabelValues(metrics.CreateDDLWorker).Inc()
|
||||
metrics.DDLCounter.WithLabelValues(fmt.Sprintf("%s_%s", metrics.CreateDDLWorker, worker.String())).Inc()
|
||||
|
||||
// When the start function is called, we will send a fake job to let worker
|
||||
// checks owner firstly and try to find whether a job exists and run.
|
||||
@ -419,13 +418,6 @@ func (d *ddl) genGlobalID() (int64, error) {
|
||||
return globalID, errors.Trace(err)
|
||||
}
|
||||
|
||||
// generalWorker returns the general worker.
|
||||
// It's used for testing.
|
||||
// TODO: Remove this function.
|
||||
func (d *ddl) generalWorker() *worker {
|
||||
return d.workers[generalWorker]
|
||||
}
|
||||
|
||||
// SchemaSyncer implements DDL.SchemaSyncer interface.
|
||||
func (d *ddl) SchemaSyncer() SchemaSyncer {
|
||||
return d.schemaSyncer
|
||||
|
||||
@ -56,6 +56,11 @@ func (d *ddl) SetInterceptoror(i Interceptor) {
|
||||
d.mu.interceptor = i
|
||||
}
|
||||
|
||||
// generalWorker returns the general worker.
|
||||
func (d *ddl) generalWorker() *worker {
|
||||
return d.workers[generalWorker]
|
||||
}
|
||||
|
||||
func TestT(t *testing.T) {
|
||||
CustomVerboseFlag = true
|
||||
logLevel := os.Getenv("log_level")
|
||||
|
||||
@ -16,6 +16,7 @@ package ddl
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/juju/errors"
|
||||
@ -32,8 +33,12 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// RunWorker indicates if this TiDB server starts DDL worker and can run DDL job.
|
||||
var RunWorker = true
|
||||
var (
|
||||
// RunWorker indicates if this TiDB server starts DDL worker and can run DDL job.
|
||||
RunWorker = true
|
||||
// ddlWorkerID is used for generating the next DDL worker ID.
|
||||
ddlWorkerID = int32(0)
|
||||
)
|
||||
|
||||
type workerType byte
|
||||
|
||||
@ -51,7 +56,7 @@ const (
|
||||
// worker is used for handling DDL jobs.
|
||||
// Now we have two kinds of workers.
|
||||
type worker struct {
|
||||
id int
|
||||
id int32
|
||||
tp workerType
|
||||
ddlJobCh chan struct{}
|
||||
quitCh chan struct{}
|
||||
@ -61,9 +66,9 @@ type worker struct {
|
||||
delRangeManager delRangeManager
|
||||
}
|
||||
|
||||
func newWorker(tp workerType, id int, store kv.Storage, ctxPool *pools.ResourcePool) *worker {
|
||||
func newWorker(tp workerType, store kv.Storage, ctxPool *pools.ResourcePool) *worker {
|
||||
worker := &worker{
|
||||
id: id,
|
||||
id: atomic.AddInt32(&ddlWorkerID, 1),
|
||||
tp: tp,
|
||||
ddlJobCh: make(chan struct{}, 1),
|
||||
quitCh: make(chan struct{}),
|
||||
|
||||
Reference in New Issue
Block a user