ddl: support modify the related reorg config by SQL (#57336)

ref pingcap/tidb#57229
This commit is contained in:
fzzf678
2024-11-15 12:58:23 +08:00
committed by GitHub
parent 851af3587c
commit afacd100e8
10 changed files with 437 additions and 0 deletions

View File

@ -18,6 +18,7 @@ import (
"context"
"fmt"
"math"
"strconv"
"strings"
"sync"
"testing"
@ -1124,3 +1125,124 @@ func TestDDLJobErrEntrySizeTooLarge(t *testing.T) {
tk.MustExec("create table t1 (a int);")
tk.MustExec("alter table t add column b int;") // Should not block.
}
func insertMockJob2Table(tk *testkit.TestKit, job *model.Job) {
b, err := job.Encode(false)
tk.RequireNoError(err)
sql := fmt.Sprintf("insert into mysql.tidb_ddl_job(job_id, job_meta) values(%s, ?);",
strconv.FormatInt(job.ID, 10))
tk.MustExec(sql, b)
}
func getJobMetaByID(t *testing.T, tk *testkit.TestKit, jobID int64) *model.Job {
sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_job where job_id = %s",
strconv.FormatInt(jobID, 10))
rows := tk.MustQuery(sql)
res := rows.Rows()
require.Len(t, res, 1)
require.Len(t, res[0], 1)
jobBinary := []byte(res[0][0].(string))
job := model.Job{}
err := job.Decode(jobBinary)
require.NoError(t, err)
return &job
}
func deleteJobMetaByID(tk *testkit.TestKit, jobID int64) {
sql := fmt.Sprintf("delete from mysql.tidb_ddl_job where job_id = %s",
strconv.FormatInt(jobID, 10))
tk.MustExec(sql)
}
func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")
job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
}
insertMockJob2Table(tk, &job)
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID))
j := getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.Concurrency, 8)
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID))
j = getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.BatchSize, 256)
tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID))
j = getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta.Concurrency, 16)
require.Equal(t, j.ReorgMeta.BatchSize, 512)
deleteJobMetaByID(tk, job.ID)
}
func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")
// invalid config value
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]")
tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]")
// invalid job id
tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running")
job := model.Job{
ID: 1,
Type: model.ActionAddColumn,
}
insertMockJob2Table(tk, &job)
// unsupported job type
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID),
"unsupported DDL operation: add column, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job")
deleteJobMetaByID(tk, 1)
job = model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
IsDistReorg: true,
},
}
insertMockJob2Table(tk, &job)
// unsupported job type
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID),
"unsupported DDL operation: add index, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job")
deleteJobMetaByID(tk, 1)
}
func TestAdminAlterDDLJobCommitFailed(t *testing.T) {
store := testkit.CreateMockStore(t)
tk := testkit.NewTestKit(t, store)
tk.MustExec("use test")
tk.MustExec("create table t (a int);")
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed", `return(true)`)
defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed")
job := model.Job{
ID: 1,
Type: model.ActionAddIndex,
ReorgMeta: &model.DDLReorgMeta{
Concurrency: 4,
BatchSize: 128,
},
}
insertMockJob2Table(tk, &job)
tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID),
"mock commit failed on admin alter ddl jobs")
j := getJobMetaByID(t, tk, job.ID)
require.Equal(t, j.ReorgMeta, job.ReorgMeta)
deleteJobMetaByID(tk, job.ID)
}

View File

@ -102,6 +102,8 @@ go_library(
"//pkg/ddl/label",
"//pkg/ddl/placement",
"//pkg/ddl/schematracker",
"//pkg/ddl/session",
"//pkg/ddl/util",
"//pkg/distsql",
"//pkg/distsql/context",
"//pkg/disttask/framework/handle",

View File

@ -222,6 +222,8 @@ func (b *executorBuilder) build(p base.Plan) exec.Executor {
return b.buildPauseDDLJobs(v)
case *plannercore.ResumeDDLJobs:
return b.buildResumeDDLJobs(v)
case *plannercore.AlterDDLJob:
return b.buildAlterDDLJob(v)
case *plannercore.ShowNextRowID:
return b.buildShowNextRowID(v)
case *plannercore.ShowDDL:
@ -359,6 +361,15 @@ func (b *executorBuilder) buildResumeDDLJobs(v *plannercore.ResumeDDLJobs) exec.
return e
}
func (b *executorBuilder) buildAlterDDLJob(v *plannercore.AlterDDLJob) exec.Executor {
e := &AlterDDLJobExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),
jobID: v.JobID,
AlterOpts: v.Options,
}
return e
}
func (b *executorBuilder) buildShowNextRowID(v *plannercore.ShowNextRowID) exec.Executor {
e := &ShowNextRowIDExec{
BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()),

View File

@ -19,8 +19,16 @@ import (
"fmt"
"strconv"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/ddl"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/ddl/util"
"github.com/pingcap/tidb/pkg/executor/internal/exec"
"github.com/pingcap/tidb/pkg/expression"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/planner/core"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util/chunk"
)
@ -85,3 +93,126 @@ type PauseDDLJobsExec struct {
type ResumeDDLJobsExec struct {
*CommandDDLJobsExec
}
// AlterDDLJobExec indicates an Executor for alter config of a DDL Job.
type AlterDDLJobExec struct {
exec.BaseExecutor
jobID int64
AlterOpts []*core.AlterDDLJobOpt
}
// Open implements the Executor Open interface.
func (e *AlterDDLJobExec) Open(ctx context.Context) error {
newSess, err := e.GetSysSession()
if err != nil {
return err
}
defer e.ReleaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), newSess)
return e.processAlterDDLJobConfig(ctx, newSess)
}
func getJobMetaFromTable(
ctx context.Context,
se *sess.Session,
jobID int64,
) (*model.Job, error) {
sql := fmt.Sprintf("select job_meta from mysql.%s where job_id = %s",
ddl.JobTable, strconv.FormatInt(jobID, 10))
rows, err := se.Execute(ctx, sql, "get_job_by_id")
if err != nil {
return nil, errors.Trace(err)
}
if len(rows) == 0 {
return nil, fmt.Errorf("ddl job %d is not running", jobID)
}
jobBinary := rows[0].GetBytes(0)
job := model.Job{}
err = job.Decode(jobBinary)
if err != nil {
return nil, errors.Trace(err)
}
return &job, nil
}
func updateJobMeta2Table(
ctx context.Context,
se *sess.Session,
job *model.Job,
) error {
b, err := job.Encode(false)
if err != nil {
return err
}
sql := fmt.Sprintf("update mysql.%s set job_meta = %s where job_id = %d",
ddl.JobTable, util.WrapKey2String(b), job.ID)
_, err = se.Execute(ctx, sql, "update_job")
return errors.Trace(err)
}
const alterDDLJobMaxRetryCnt = 3
// processAlterDDLJobConfig try to alter the ddl job configs.
// In case of failure, it will retry alterDDLJobMaxRetryCnt times.
func (e *AlterDDLJobExec) processAlterDDLJobConfig(
ctx context.Context,
sessCtx sessionctx.Context,
) (err error) {
ns := sess.NewSession(sessCtx)
var job *model.Job
for tryN := uint(0); tryN < alterDDLJobMaxRetryCnt; tryN++ {
if err = ns.Begin(ctx); err != nil {
continue
}
job, err = getJobMetaFromTable(ctx, ns, e.jobID)
if err != nil {
continue
}
if !job.IsAlterable() {
return fmt.Errorf("unsupported DDL operation: %s, "+
"only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job", job.Type.String())
}
if err = e.updateReorgMeta(job, model.AdminCommandByEndUser); err != nil {
continue
}
if err = updateJobMeta2Table(ctx, ns, job); err != nil {
continue
}
failpoint.Inject("mockAlterDDLJobCommitFailed", func(val failpoint.Value) {
if val.(bool) {
ns.Rollback()
failpoint.Return(errors.New("mock commit failed on admin alter ddl jobs"))
}
})
if err = ns.Commit(ctx); err != nil {
ns.Rollback()
continue
}
return nil
}
return err
}
func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error {
for _, opt := range e.AlterOpts {
switch opt.Name {
case core.AlterDDLJobThread:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.Concurrency = int(cons.Value.GetInt64())
}
job.AdminOperator = byWho
case core.AlterDDLJobBatchSize:
if opt.Value != nil {
cons := opt.Value.(*expression.Constant)
job.ReorgMeta.BatchSize = int(cons.Value.GetInt64())
}
job.AdminOperator = byWho
default:
return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
}
}
return nil
}

View File

@ -640,6 +640,14 @@ func (job *Job) IsPausable() bool {
return job.NotStarted() || (job.IsRunning() && job.IsRollbackable())
}
// IsAlterable checks whether the job type can be altered.
func (job *Job) IsAlterable() bool {
// Currently, only non-distributed add index reorg task can be altered
return job.Type == ActionAddIndex && !job.ReorgMeta.IsDistReorg ||
job.Type == ActionModifyColumn ||
job.Type == ActionReorganizePartition
}
// IsResumable checks whether the job can be rollback.
func (job *Job) IsResumable() bool {
return job.IsPaused()

View File

@ -148,6 +148,35 @@ type ResumeDDLJobs struct {
JobIDs []int64
}
const (
// AlterDDLJobThread alter reorg worker count
AlterDDLJobThread = "thread"
// AlterDDLJobBatchSize alter reorg batch size
AlterDDLJobBatchSize = "batch_size"
// AlterDDLJobMaxWriteSpeed alter reorg max write speed
AlterDDLJobMaxWriteSpeed = "max_write_speed"
)
var allowedAlterDDLJobParams = map[string]struct{}{
AlterDDLJobThread: {},
AlterDDLJobBatchSize: {},
AlterDDLJobMaxWriteSpeed: {},
}
// AlterDDLJobOpt represents alter ddl job option.
type AlterDDLJobOpt struct {
Name string
Value expression.Expression
}
// AlterDDLJob is the plan of admin alter ddl job
type AlterDDLJob struct {
baseSchemaProducer
JobID int64
Options []*AlterDDLJobOpt
}
// ReloadExprPushdownBlacklist reloads the data from expr_pushdown_blacklist table.
type ReloadExprPushdownBlacklist struct {
baseSchemaProducer

View File

@ -1576,6 +1576,11 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (base.P
p := &AdminShowBDRRole{}
p.setSchemaAndNames(buildAdminShowBDRRoleFields())
ret = p
case ast.AdminAlterDDLJob:
ret, err = b.buildAdminAlterDDLJob(ctx, as)
if err != nil {
return nil, err
}
default:
return nil, plannererrors.ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as)
}
@ -5858,3 +5863,52 @@ func getTablePath(paths []*util.AccessPath) *util.AccessPath {
}
return nil
}
func (b *PlanBuilder) buildAdminAlterDDLJob(ctx context.Context, as *ast.AdminStmt) (_ base.Plan, err error) {
options := make([]*AlterDDLJobOpt, 0, len(as.AlterJobOptions))
optionNames := make([]string, 0, len(as.AlterJobOptions))
mockTablePlan := logicalop.LogicalTableDual{}.Init(b.ctx, b.getSelectOffset())
for _, opt := range as.AlterJobOptions {
_, ok := allowedAlterDDLJobParams[opt.Name]
if !ok {
return nil, fmt.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name)
}
alterDDLJobOpt := AlterDDLJobOpt{Name: opt.Name}
if opt.Value != nil {
alterDDLJobOpt.Value, _, err = b.rewrite(ctx, opt.Value, mockTablePlan, nil, true)
if err != nil {
return nil, err
}
}
if err = checkAlterDDLJobOptValue(&alterDDLJobOpt); err != nil {
return nil, err
}
options = append(options, &alterDDLJobOpt)
optionNames = append(optionNames, opt.Name)
}
p := &AlterDDLJob{
JobID: as.JobNumber,
Options: options,
}
return p, nil
}
// check if the config value is valid.
func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error {
switch opt.Name {
case AlterDDLJobThread:
thread := opt.Value.(*expression.Constant).Value.GetInt64()
if thread < 1 || thread > variable.MaxConfigurableConcurrency {
return fmt.Errorf("the value %v for %s is out of range [1, %v]",
thread, opt.Name, variable.MaxConfigurableConcurrency)
}
case AlterDDLJobBatchSize:
batchSize := opt.Value.(*expression.Constant).Value.GetInt64()
bs := int32(batchSize)
if bs < variable.MinDDLReorgBatchSize || bs > variable.MaxDDLReorgBatchSize {
return fmt.Errorf("the value %v for %s is out of range [%v, %v]",
bs, opt.Name, variable.MinDDLReorgBatchSize, variable.MaxDDLReorgBatchSize)
}
}
return nil
}

View File

@ -18,6 +18,7 @@ import (
"context"
"fmt"
"reflect"
"sort"
"strings"
"testing"
"unsafe"
@ -877,3 +878,78 @@ func TestImportIntoCollAssignmentChecker(t *testing.T) {
})
}
}
func TestBuildAdminAlterDDLJobPlan(t *testing.T) {
parser := parser.New()
sctx := MockContext()
ctx := context.TODO()
builder, _ := NewPlanBuilder().Init(sctx, nil, hint.NewQBHintHandler(nil))
stmt, err := parser.ParseOneStmt("admin alter ddl jobs 1 thread = 16 ", "", "")
require.NoError(t, err)
p, err := builder.Build(ctx, resolve.NewNodeW(stmt))
require.NoError(t, err)
plan, ok := p.(*AlterDDLJob)
require.True(t, ok)
require.Equal(t, plan.JobID, int64(1))
require.Equal(t, len(plan.Options), 1)
require.Equal(t, plan.Options[0].Name, AlterDDLJobThread)
cons, ok := plan.Options[0].Value.(*expression.Constant)
require.True(t, ok)
require.Equal(t, cons.Value.GetInt64(), int64(16))
stmt, err = parser.ParseOneStmt("admin alter ddl jobs 2 batch_size = 512 ", "", "")
require.NoError(t, err)
p, err = builder.Build(ctx, resolve.NewNodeW(stmt))
require.NoError(t, err)
plan, ok = p.(*AlterDDLJob)
require.True(t, ok)
require.Equal(t, plan.JobID, int64(2))
require.Equal(t, len(plan.Options), 1)
require.Equal(t, plan.Options[0].Name, AlterDDLJobBatchSize)
cons, ok = plan.Options[0].Value.(*expression.Constant)
require.True(t, ok)
require.Equal(t, cons.Value.GetInt64(), int64(512))
stmt, err = parser.ParseOneStmt("admin alter ddl jobs 3 max_write_speed = '10MiB' ", "", "")
require.NoError(t, err)
p, err = builder.Build(ctx, resolve.NewNodeW(stmt))
require.NoError(t, err)
plan, ok = p.(*AlterDDLJob)
require.True(t, ok)
require.Equal(t, plan.JobID, int64(3))
require.Equal(t, len(plan.Options), 1)
require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed)
cons, ok = plan.Options[0].Value.(*expression.Constant)
require.True(t, ok)
require.Equal(t, cons.Value.GetString(), "10MiB")
stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 thread = 16, batch_size = 512, max_write_speed = '10MiB' ", "", "")
require.NoError(t, err)
p, err = builder.Build(ctx, resolve.NewNodeW(stmt))
require.NoError(t, err)
plan, ok = p.(*AlterDDLJob)
require.True(t, ok)
require.Equal(t, plan.JobID, int64(4))
require.Equal(t, len(plan.Options), 3)
sort.Slice(plan.Options, func(i, j int) bool {
return plan.Options[i].Name < plan.Options[j].Name
})
require.Equal(t, plan.Options[0].Name, AlterDDLJobBatchSize)
cons, ok = plan.Options[0].Value.(*expression.Constant)
require.True(t, ok)
require.Equal(t, cons.Value.GetInt64(), int64(512))
require.Equal(t, plan.Options[1].Name, AlterDDLJobMaxWriteSpeed)
cons, ok = plan.Options[1].Value.(*expression.Constant)
require.True(t, ok)
require.Equal(t, cons.Value.GetString(), "10MiB")
require.Equal(t, plan.Options[2].Name, AlterDDLJobThread)
cons, ok = plan.Options[2].Value.(*expression.Constant)
require.True(t, ok)
require.Equal(t, cons.Value.GetInt64(), int64(16))
stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 aaa = 16", "", "")
require.NoError(t, err)
_, err = builder.Build(ctx, resolve.NewNodeW(stmt))
require.Equal(t, err.Error(), "unsupported admin alter ddl jobs config: aaa")
}

View File

@ -645,3 +645,5 @@ ADMIN SHOW SLOW RECENT 3;
Error 8121 (HY000): privilege check for 'Super' fail
ADMIN SHOW SLOW TOP ALL 3;
Error 8121 (HY000): privilege check for 'Super' fail
ADMIN ALTER DDL JOBS 10 THREAD = 3, BATCH_SIZE = 100, MAX_WRITE_SPEED = '10MiB';
Error 8121 (HY000): privilege check for 'Super' fail

View File

@ -866,6 +866,8 @@ ADMIN SHOW privilege__privileges.admin NEXT_ROW_ID;
ADMIN SHOW SLOW RECENT 3;
-- error 8121
ADMIN SHOW SLOW TOP ALL 3;
-- error 8121
ADMIN ALTER DDL JOBS 10 THREAD = 3, BATCH_SIZE = 100, MAX_WRITE_SPEED = '10MiB';
disconnect without_super;
connection default;