From afacd100e89f94341f0faa7bca2f4ec6d24ee41e Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Fri, 15 Nov 2024 12:58:23 +0800 Subject: [PATCH] ddl: support modify the related reorg config by SQL (#57336) ref pingcap/tidb#57229 --- pkg/ddl/db_test.go | 122 ++++++++++++++++ pkg/executor/BUILD.bazel | 2 + pkg/executor/builder.go | 11 ++ pkg/executor/operate_ddl_jobs.go | 131 ++++++++++++++++++ pkg/meta/model/job.go | 8 ++ pkg/planner/core/common_plans.go | 29 ++++ pkg/planner/core/planbuilder.go | 54 ++++++++ pkg/planner/core/planbuilder_test.go | 76 ++++++++++ .../r/privilege/privileges.result | 2 + .../t/privilege/privileges.test | 2 + 10 files changed, 437 insertions(+) diff --git a/pkg/ddl/db_test.go b/pkg/ddl/db_test.go index 4898e0322f..2cd989ce11 100644 --- a/pkg/ddl/db_test.go +++ b/pkg/ddl/db_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "math" + "strconv" "strings" "sync" "testing" @@ -1124,3 +1125,124 @@ func TestDDLJobErrEntrySizeTooLarge(t *testing.T) { tk.MustExec("create table t1 (a int);") tk.MustExec("alter table t add column b int;") // Should not block. } + +func insertMockJob2Table(tk *testkit.TestKit, job *model.Job) { + b, err := job.Encode(false) + tk.RequireNoError(err) + sql := fmt.Sprintf("insert into mysql.tidb_ddl_job(job_id, job_meta) values(%s, ?);", + strconv.FormatInt(job.ID, 10)) + tk.MustExec(sql, b) +} + +func getJobMetaByID(t *testing.T, tk *testkit.TestKit, jobID int64) *model.Job { + sql := fmt.Sprintf("select job_meta from mysql.tidb_ddl_job where job_id = %s", + strconv.FormatInt(jobID, 10)) + rows := tk.MustQuery(sql) + res := rows.Rows() + require.Len(t, res, 1) + require.Len(t, res[0], 1) + jobBinary := []byte(res[0][0].(string)) + job := model.Job{} + err := job.Decode(jobBinary) + require.NoError(t, err) + return &job +} + +func deleteJobMetaByID(tk *testkit.TestKit, jobID int64) { + sql := fmt.Sprintf("delete from mysql.tidb_ddl_job where job_id = %s", + strconv.FormatInt(jobID, 10)) + tk.MustExec(sql) +} + +func TestAdminAlterDDLJobUpdateSysTable(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int);") + + job := model.Job{ + ID: 1, + Type: model.ActionAddIndex, + ReorgMeta: &model.DDLReorgMeta{ + Concurrency: 4, + BatchSize: 128, + }, + } + insertMockJob2Table(tk, &job) + tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID)) + j := getJobMetaByID(t, tk, job.ID) + require.Equal(t, j.ReorgMeta.Concurrency, 8) + + tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d batch_size = 256;", job.ID)) + j = getJobMetaByID(t, tk, job.ID) + require.Equal(t, j.ReorgMeta.BatchSize, 256) + + tk.MustExec(fmt.Sprintf("admin alter ddl jobs %d thread = 16, batch_size = 512;", job.ID)) + j = getJobMetaByID(t, tk, job.ID) + require.Equal(t, j.ReorgMeta.Concurrency, 16) + require.Equal(t, j.ReorgMeta.BatchSize, 512) + deleteJobMetaByID(tk, job.ID) +} + +func TestAdminAlterDDLJobUnsupportedCases(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int);") + + // invalid config value + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 0;", "the value 0 for thread is out of range [1, 256]") + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 257;", "the value 257 for thread is out of range [1, 256]") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 31;", "the value 31 for batch_size is out of range [32, 10240]") + tk.MustGetErrMsg("admin alter ddl jobs 1 batch_size = 10241;", "the value 10241 for batch_size is out of range [32, 10240]") + + // invalid job id + tk.MustGetErrMsg("admin alter ddl jobs 1 thread = 8;", "ddl job 1 is not running") + + job := model.Job{ + ID: 1, + Type: model.ActionAddColumn, + } + insertMockJob2Table(tk, &job) + // unsupported job type + tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID), + "unsupported DDL operation: add column, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job") + deleteJobMetaByID(tk, 1) + + job = model.Job{ + ID: 1, + Type: model.ActionAddIndex, + ReorgMeta: &model.DDLReorgMeta{ + IsDistReorg: true, + }, + } + insertMockJob2Table(tk, &job) + // unsupported job type + tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8;", job.ID), + "unsupported DDL operation: add index, only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job") + deleteJobMetaByID(tk, 1) +} + +func TestAdminAlterDDLJobCommitFailed(t *testing.T) { + store := testkit.CreateMockStore(t) + tk := testkit.NewTestKit(t, store) + tk.MustExec("use test") + tk.MustExec("create table t (a int);") + testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed", `return(true)`) + defer testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/executor/mockAlterDDLJobCommitFailed") + + job := model.Job{ + ID: 1, + Type: model.ActionAddIndex, + ReorgMeta: &model.DDLReorgMeta{ + Concurrency: 4, + BatchSize: 128, + }, + } + insertMockJob2Table(tk, &job) + tk.MustGetErrMsg(fmt.Sprintf("admin alter ddl jobs %d thread = 8, batch_size = 256;", job.ID), + "mock commit failed on admin alter ddl jobs") + j := getJobMetaByID(t, tk, job.ID) + require.Equal(t, j.ReorgMeta, job.ReorgMeta) + deleteJobMetaByID(tk, job.ID) +} diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 035f718469..83b3bc777f 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -102,6 +102,8 @@ go_library( "//pkg/ddl/label", "//pkg/ddl/placement", "//pkg/ddl/schematracker", + "//pkg/ddl/session", + "//pkg/ddl/util", "//pkg/distsql", "//pkg/distsql/context", "//pkg/disttask/framework/handle", diff --git a/pkg/executor/builder.go b/pkg/executor/builder.go index 80012c8e2e..801ee4d346 100644 --- a/pkg/executor/builder.go +++ b/pkg/executor/builder.go @@ -222,6 +222,8 @@ func (b *executorBuilder) build(p base.Plan) exec.Executor { return b.buildPauseDDLJobs(v) case *plannercore.ResumeDDLJobs: return b.buildResumeDDLJobs(v) + case *plannercore.AlterDDLJob: + return b.buildAlterDDLJob(v) case *plannercore.ShowNextRowID: return b.buildShowNextRowID(v) case *plannercore.ShowDDL: @@ -359,6 +361,15 @@ func (b *executorBuilder) buildResumeDDLJobs(v *plannercore.ResumeDDLJobs) exec. return e } +func (b *executorBuilder) buildAlterDDLJob(v *plannercore.AlterDDLJob) exec.Executor { + e := &AlterDDLJobExec{ + BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), + jobID: v.JobID, + AlterOpts: v.Options, + } + return e +} + func (b *executorBuilder) buildShowNextRowID(v *plannercore.ShowNextRowID) exec.Executor { e := &ShowNextRowIDExec{ BaseExecutor: exec.NewBaseExecutor(b.ctx, v.Schema(), v.ID()), diff --git a/pkg/executor/operate_ddl_jobs.go b/pkg/executor/operate_ddl_jobs.go index 5bb04fb900..4aa14405aa 100644 --- a/pkg/executor/operate_ddl_jobs.go +++ b/pkg/executor/operate_ddl_jobs.go @@ -19,8 +19,16 @@ import ( "fmt" "strconv" + "github.com/pingcap/errors" + "github.com/pingcap/failpoint" + "github.com/pingcap/tidb/pkg/ddl" + sess "github.com/pingcap/tidb/pkg/ddl/session" + "github.com/pingcap/tidb/pkg/ddl/util" "github.com/pingcap/tidb/pkg/executor/internal/exec" + "github.com/pingcap/tidb/pkg/expression" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/meta/model" + "github.com/pingcap/tidb/pkg/planner/core" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/util/chunk" ) @@ -85,3 +93,126 @@ type PauseDDLJobsExec struct { type ResumeDDLJobsExec struct { *CommandDDLJobsExec } + +// AlterDDLJobExec indicates an Executor for alter config of a DDL Job. +type AlterDDLJobExec struct { + exec.BaseExecutor + jobID int64 + AlterOpts []*core.AlterDDLJobOpt +} + +// Open implements the Executor Open interface. +func (e *AlterDDLJobExec) Open(ctx context.Context) error { + newSess, err := e.GetSysSession() + if err != nil { + return err + } + defer e.ReleaseSysSession(kv.WithInternalSourceType(context.Background(), kv.InternalTxnDDL), newSess) + + return e.processAlterDDLJobConfig(ctx, newSess) +} + +func getJobMetaFromTable( + ctx context.Context, + se *sess.Session, + jobID int64, +) (*model.Job, error) { + sql := fmt.Sprintf("select job_meta from mysql.%s where job_id = %s", + ddl.JobTable, strconv.FormatInt(jobID, 10)) + rows, err := se.Execute(ctx, sql, "get_job_by_id") + if err != nil { + return nil, errors.Trace(err) + } + if len(rows) == 0 { + return nil, fmt.Errorf("ddl job %d is not running", jobID) + } + jobBinary := rows[0].GetBytes(0) + job := model.Job{} + err = job.Decode(jobBinary) + if err != nil { + return nil, errors.Trace(err) + } + return &job, nil +} + +func updateJobMeta2Table( + ctx context.Context, + se *sess.Session, + job *model.Job, +) error { + b, err := job.Encode(false) + if err != nil { + return err + } + sql := fmt.Sprintf("update mysql.%s set job_meta = %s where job_id = %d", + ddl.JobTable, util.WrapKey2String(b), job.ID) + _, err = se.Execute(ctx, sql, "update_job") + return errors.Trace(err) +} + +const alterDDLJobMaxRetryCnt = 3 + +// processAlterDDLJobConfig try to alter the ddl job configs. +// In case of failure, it will retry alterDDLJobMaxRetryCnt times. +func (e *AlterDDLJobExec) processAlterDDLJobConfig( + ctx context.Context, + sessCtx sessionctx.Context, +) (err error) { + ns := sess.NewSession(sessCtx) + var job *model.Job + for tryN := uint(0); tryN < alterDDLJobMaxRetryCnt; tryN++ { + if err = ns.Begin(ctx); err != nil { + continue + } + job, err = getJobMetaFromTable(ctx, ns, e.jobID) + if err != nil { + continue + } + if !job.IsAlterable() { + return fmt.Errorf("unsupported DDL operation: %s, "+ + "only support add index(tidb_enable_dist_task=off), modify column and alter table reorganize partition DDL job", job.Type.String()) + } + if err = e.updateReorgMeta(job, model.AdminCommandByEndUser); err != nil { + continue + } + if err = updateJobMeta2Table(ctx, ns, job); err != nil { + continue + } + + failpoint.Inject("mockAlterDDLJobCommitFailed", func(val failpoint.Value) { + if val.(bool) { + ns.Rollback() + failpoint.Return(errors.New("mock commit failed on admin alter ddl jobs")) + } + }) + + if err = ns.Commit(ctx); err != nil { + ns.Rollback() + continue + } + return nil + } + return err +} + +func (e *AlterDDLJobExec) updateReorgMeta(job *model.Job, byWho model.AdminCommandOperator) error { + for _, opt := range e.AlterOpts { + switch opt.Name { + case core.AlterDDLJobThread: + if opt.Value != nil { + cons := opt.Value.(*expression.Constant) + job.ReorgMeta.Concurrency = int(cons.Value.GetInt64()) + } + job.AdminOperator = byWho + case core.AlterDDLJobBatchSize: + if opt.Value != nil { + cons := opt.Value.(*expression.Constant) + job.ReorgMeta.BatchSize = int(cons.Value.GetInt64()) + } + job.AdminOperator = byWho + default: + return errors.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name) + } + } + return nil +} diff --git a/pkg/meta/model/job.go b/pkg/meta/model/job.go index c87f8a094a..77dc8599d9 100644 --- a/pkg/meta/model/job.go +++ b/pkg/meta/model/job.go @@ -640,6 +640,14 @@ func (job *Job) IsPausable() bool { return job.NotStarted() || (job.IsRunning() && job.IsRollbackable()) } +// IsAlterable checks whether the job type can be altered. +func (job *Job) IsAlterable() bool { + // Currently, only non-distributed add index reorg task can be altered + return job.Type == ActionAddIndex && !job.ReorgMeta.IsDistReorg || + job.Type == ActionModifyColumn || + job.Type == ActionReorganizePartition +} + // IsResumable checks whether the job can be rollback. func (job *Job) IsResumable() bool { return job.IsPaused() diff --git a/pkg/planner/core/common_plans.go b/pkg/planner/core/common_plans.go index 973577930f..f8fff92864 100644 --- a/pkg/planner/core/common_plans.go +++ b/pkg/planner/core/common_plans.go @@ -148,6 +148,35 @@ type ResumeDDLJobs struct { JobIDs []int64 } +const ( + // AlterDDLJobThread alter reorg worker count + AlterDDLJobThread = "thread" + // AlterDDLJobBatchSize alter reorg batch size + AlterDDLJobBatchSize = "batch_size" + // AlterDDLJobMaxWriteSpeed alter reorg max write speed + AlterDDLJobMaxWriteSpeed = "max_write_speed" +) + +var allowedAlterDDLJobParams = map[string]struct{}{ + AlterDDLJobThread: {}, + AlterDDLJobBatchSize: {}, + AlterDDLJobMaxWriteSpeed: {}, +} + +// AlterDDLJobOpt represents alter ddl job option. +type AlterDDLJobOpt struct { + Name string + Value expression.Expression +} + +// AlterDDLJob is the plan of admin alter ddl job +type AlterDDLJob struct { + baseSchemaProducer + + JobID int64 + Options []*AlterDDLJobOpt +} + // ReloadExprPushdownBlacklist reloads the data from expr_pushdown_blacklist table. type ReloadExprPushdownBlacklist struct { baseSchemaProducer diff --git a/pkg/planner/core/planbuilder.go b/pkg/planner/core/planbuilder.go index 591c5420b7..fba6e19c89 100644 --- a/pkg/planner/core/planbuilder.go +++ b/pkg/planner/core/planbuilder.go @@ -1576,6 +1576,11 @@ func (b *PlanBuilder) buildAdmin(ctx context.Context, as *ast.AdminStmt) (base.P p := &AdminShowBDRRole{} p.setSchemaAndNames(buildAdminShowBDRRoleFields()) ret = p + case ast.AdminAlterDDLJob: + ret, err = b.buildAdminAlterDDLJob(ctx, as) + if err != nil { + return nil, err + } default: return nil, plannererrors.ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as) } @@ -5858,3 +5863,52 @@ func getTablePath(paths []*util.AccessPath) *util.AccessPath { } return nil } + +func (b *PlanBuilder) buildAdminAlterDDLJob(ctx context.Context, as *ast.AdminStmt) (_ base.Plan, err error) { + options := make([]*AlterDDLJobOpt, 0, len(as.AlterJobOptions)) + optionNames := make([]string, 0, len(as.AlterJobOptions)) + mockTablePlan := logicalop.LogicalTableDual{}.Init(b.ctx, b.getSelectOffset()) + for _, opt := range as.AlterJobOptions { + _, ok := allowedAlterDDLJobParams[opt.Name] + if !ok { + return nil, fmt.Errorf("unsupported admin alter ddl jobs config: %s", opt.Name) + } + alterDDLJobOpt := AlterDDLJobOpt{Name: opt.Name} + if opt.Value != nil { + alterDDLJobOpt.Value, _, err = b.rewrite(ctx, opt.Value, mockTablePlan, nil, true) + if err != nil { + return nil, err + } + } + if err = checkAlterDDLJobOptValue(&alterDDLJobOpt); err != nil { + return nil, err + } + options = append(options, &alterDDLJobOpt) + optionNames = append(optionNames, opt.Name) + } + p := &AlterDDLJob{ + JobID: as.JobNumber, + Options: options, + } + return p, nil +} + +// check if the config value is valid. +func checkAlterDDLJobOptValue(opt *AlterDDLJobOpt) error { + switch opt.Name { + case AlterDDLJobThread: + thread := opt.Value.(*expression.Constant).Value.GetInt64() + if thread < 1 || thread > variable.MaxConfigurableConcurrency { + return fmt.Errorf("the value %v for %s is out of range [1, %v]", + thread, opt.Name, variable.MaxConfigurableConcurrency) + } + case AlterDDLJobBatchSize: + batchSize := opt.Value.(*expression.Constant).Value.GetInt64() + bs := int32(batchSize) + if bs < variable.MinDDLReorgBatchSize || bs > variable.MaxDDLReorgBatchSize { + return fmt.Errorf("the value %v for %s is out of range [%v, %v]", + bs, opt.Name, variable.MinDDLReorgBatchSize, variable.MaxDDLReorgBatchSize) + } + } + return nil +} diff --git a/pkg/planner/core/planbuilder_test.go b/pkg/planner/core/planbuilder_test.go index f52c275191..d2c416960b 100644 --- a/pkg/planner/core/planbuilder_test.go +++ b/pkg/planner/core/planbuilder_test.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "reflect" + "sort" "strings" "testing" "unsafe" @@ -877,3 +878,78 @@ func TestImportIntoCollAssignmentChecker(t *testing.T) { }) } } + +func TestBuildAdminAlterDDLJobPlan(t *testing.T) { + parser := parser.New() + sctx := MockContext() + ctx := context.TODO() + builder, _ := NewPlanBuilder().Init(sctx, nil, hint.NewQBHintHandler(nil)) + + stmt, err := parser.ParseOneStmt("admin alter ddl jobs 1 thread = 16 ", "", "") + require.NoError(t, err) + p, err := builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok := p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(1)) + require.Equal(t, len(plan.Options), 1) + require.Equal(t, plan.Options[0].Name, AlterDDLJobThread) + cons, ok := plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + require.Equal(t, cons.Value.GetInt64(), int64(16)) + + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 2 batch_size = 512 ", "", "") + require.NoError(t, err) + p, err = builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok = p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(2)) + require.Equal(t, len(plan.Options), 1) + require.Equal(t, plan.Options[0].Name, AlterDDLJobBatchSize) + cons, ok = plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + require.Equal(t, cons.Value.GetInt64(), int64(512)) + + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 3 max_write_speed = '10MiB' ", "", "") + require.NoError(t, err) + p, err = builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok = p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(3)) + require.Equal(t, len(plan.Options), 1) + require.Equal(t, plan.Options[0].Name, AlterDDLJobMaxWriteSpeed) + cons, ok = plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + require.Equal(t, cons.Value.GetString(), "10MiB") + + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 thread = 16, batch_size = 512, max_write_speed = '10MiB' ", "", "") + require.NoError(t, err) + p, err = builder.Build(ctx, resolve.NewNodeW(stmt)) + require.NoError(t, err) + plan, ok = p.(*AlterDDLJob) + require.True(t, ok) + require.Equal(t, plan.JobID, int64(4)) + require.Equal(t, len(plan.Options), 3) + sort.Slice(plan.Options, func(i, j int) bool { + return plan.Options[i].Name < plan.Options[j].Name + }) + require.Equal(t, plan.Options[0].Name, AlterDDLJobBatchSize) + cons, ok = plan.Options[0].Value.(*expression.Constant) + require.True(t, ok) + require.Equal(t, cons.Value.GetInt64(), int64(512)) + require.Equal(t, plan.Options[1].Name, AlterDDLJobMaxWriteSpeed) + cons, ok = plan.Options[1].Value.(*expression.Constant) + require.True(t, ok) + require.Equal(t, cons.Value.GetString(), "10MiB") + require.Equal(t, plan.Options[2].Name, AlterDDLJobThread) + cons, ok = plan.Options[2].Value.(*expression.Constant) + require.True(t, ok) + require.Equal(t, cons.Value.GetInt64(), int64(16)) + + stmt, err = parser.ParseOneStmt("admin alter ddl jobs 4 aaa = 16", "", "") + require.NoError(t, err) + _, err = builder.Build(ctx, resolve.NewNodeW(stmt)) + require.Equal(t, err.Error(), "unsupported admin alter ddl jobs config: aaa") +} diff --git a/tests/integrationtest/r/privilege/privileges.result b/tests/integrationtest/r/privilege/privileges.result index 8e81e8e4fc..d42b1ca3d4 100644 --- a/tests/integrationtest/r/privilege/privileges.result +++ b/tests/integrationtest/r/privilege/privileges.result @@ -645,3 +645,5 @@ ADMIN SHOW SLOW RECENT 3; Error 8121 (HY000): privilege check for 'Super' fail ADMIN SHOW SLOW TOP ALL 3; Error 8121 (HY000): privilege check for 'Super' fail +ADMIN ALTER DDL JOBS 10 THREAD = 3, BATCH_SIZE = 100, MAX_WRITE_SPEED = '10MiB'; +Error 8121 (HY000): privilege check for 'Super' fail diff --git a/tests/integrationtest/t/privilege/privileges.test b/tests/integrationtest/t/privilege/privileges.test index 3dc1f24d76..ad3b1a7bbb 100644 --- a/tests/integrationtest/t/privilege/privileges.test +++ b/tests/integrationtest/t/privilege/privileges.test @@ -866,6 +866,8 @@ ADMIN SHOW privilege__privileges.admin NEXT_ROW_ID; ADMIN SHOW SLOW RECENT 3; -- error 8121 ADMIN SHOW SLOW TOP ALL 3; +-- error 8121 +ADMIN ALTER DDL JOBS 10 THREAD = 3, BATCH_SIZE = 100, MAX_WRITE_SPEED = '10MiB'; disconnect without_super; connection default;