242 lines
11 KiB
Go
242 lines
11 KiB
Go
// Copyright 2022 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package ddl_test
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"testing"
|
|
"time"
|
|
|
|
"github.com/pingcap/failpoint"
|
|
"github.com/pingcap/tidb/pkg/domain/infosync"
|
|
"github.com/pingcap/tidb/pkg/errno"
|
|
"github.com/pingcap/tidb/pkg/meta/model"
|
|
"github.com/pingcap/tidb/pkg/sessionctx/vardef"
|
|
"github.com/pingcap/tidb/pkg/testkit"
|
|
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
|
|
"github.com/pingcap/tidb/pkg/types"
|
|
"github.com/pingcap/tidb/pkg/util/dbterror"
|
|
"github.com/stretchr/testify/assert"
|
|
"github.com/stretchr/testify/require"
|
|
"github.com/tikv/client-go/v2/oracle"
|
|
)
|
|
|
|
func TestFlashbackCloseAndResetPDSchedule(t *testing.T) {
|
|
store := testkit.CreateMockStore(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
|
|
injectSafeTS := oracle.GoTimeToTS(time.Now().Add(10 * time.Second))
|
|
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest", `return(true)`)
|
|
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/injectSafeTS",
|
|
fmt.Sprintf("return(%v)", injectSafeTS))
|
|
|
|
oldValue := map[string]any{
|
|
"merge-schedule-limit": 1,
|
|
}
|
|
require.NoError(t, infosync.SetPDScheduleConfig(context.Background(), oldValue))
|
|
|
|
timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
|
|
defer resetGC()
|
|
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
|
|
|
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
|
|
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
|
|
if job.SchemaState == model.StateWriteReorganization {
|
|
closeValue, err := infosync.GetPDScheduleConfig(context.Background())
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, closeValue["merge-schedule-limit"], 0)
|
|
}
|
|
})
|
|
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterRunOneJobStep", func(job *model.Job) {
|
|
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
|
|
if job.SchemaState == model.StateWriteReorganization {
|
|
// cancel flashback job
|
|
job.State = model.JobStateCancelled
|
|
job.Error = dbterror.ErrCancelledDDLJob
|
|
}
|
|
})
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
|
|
require.NoError(t, err)
|
|
|
|
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)), errno.ErrCancelledDDLJob)
|
|
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep")
|
|
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/afterRunOneJobStep")
|
|
|
|
finishValue, err := infosync.GetPDScheduleConfig(context.Background())
|
|
require.NoError(t, err)
|
|
require.EqualValues(t, finishValue["merge-schedule-limit"], 1)
|
|
}
|
|
|
|
func TestAddDDLDuringFlashback(t *testing.T) {
|
|
store := testkit.CreateMockStore(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("create table t(a int)")
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
|
|
require.NoError(t, err)
|
|
|
|
injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest", `return(true)`))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS",
|
|
fmt.Sprintf("return(%v)", injectSafeTS)))
|
|
|
|
timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
|
|
defer resetGC()
|
|
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
|
|
|
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
|
|
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
|
|
if job.SchemaState == model.StateWriteOnly {
|
|
tk1 := testkit.NewTestKit(t, store)
|
|
_, err := tk1.Exec("alter table test.t add column b int")
|
|
assert.ErrorContains(t, err, "Can't add ddl job, have flashback cluster job")
|
|
}
|
|
})
|
|
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)))
|
|
|
|
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep")
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS"))
|
|
}
|
|
|
|
func TestGlobalVariablesOnFlashback(t *testing.T) {
|
|
store := testkit.CreateMockStore(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
tk.MustExec("create table t(a int)")
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
|
|
require.NoError(t, err)
|
|
|
|
injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest", `return(true)`))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS",
|
|
fmt.Sprintf("return(%v)", injectSafeTS)))
|
|
|
|
timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
|
|
defer resetGC()
|
|
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
|
|
|
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", func(job *model.Job) {
|
|
assert.Equal(t, model.ActionFlashbackCluster, job.Type)
|
|
if job.SchemaState == model.StateWriteReorganization {
|
|
tk := testkit.NewTestKit(t, store)
|
|
tk.MustExec("use test")
|
|
rs, err := tk.Exec("show variables like 'tidb_gc_enable'")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
rs, err = tk.Exec("show variables like 'tidb_enable_auto_analyze'")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
rs, err = tk.Exec("show variables like 'tidb_super_read_only'")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.On)
|
|
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
}
|
|
})
|
|
// first try with `tidb_gc_enable` = on and `tidb_super_read_only` = off and `tidb_ttl_job_enable` = on
|
|
tk.MustExec("set global tidb_gc_enable = on")
|
|
tk.MustExec("set global tidb_super_read_only = off")
|
|
tk.MustExec("set global tidb_ttl_job_enable = on")
|
|
|
|
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)))
|
|
|
|
rs, err := tk.Exec("show variables like 'tidb_super_read_only'")
|
|
require.NoError(t, err)
|
|
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
rs, err = tk.Exec("show variables like 'tidb_gc_enable'")
|
|
require.NoError(t, err)
|
|
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.On)
|
|
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
|
|
require.NoError(t, err)
|
|
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
|
|
// second try with `tidb_gc_enable` = off and `tidb_super_read_only` = on and `tidb_ttl_job_enable` = off
|
|
tk.MustExec("set global tidb_gc_enable = off")
|
|
tk.MustExec("set global tidb_super_read_only = on")
|
|
tk.MustExec("set global tidb_ttl_job_enable = off")
|
|
|
|
ts, err = tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
|
|
require.NoError(t, err)
|
|
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)))
|
|
rs, err = tk.Exec("show variables like 'tidb_super_read_only'")
|
|
require.NoError(t, err)
|
|
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.On)
|
|
rs, err = tk.Exec("show variables like 'tidb_gc_enable'")
|
|
require.NoError(t, err)
|
|
require.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
|
|
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep")
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS"))
|
|
}
|
|
|
|
func TestCancelFlashbackCluster(t *testing.T) {
|
|
store := testkit.CreateMockStore(t)
|
|
tk := testkit.NewTestKit(t, store)
|
|
|
|
time.Sleep(10 * time.Millisecond)
|
|
ts, err := tk.Session().GetStore().GetOracle().GetTimestamp(context.Background(), &oracle.Option{})
|
|
require.NoError(t, err)
|
|
|
|
injectSafeTS := oracle.GoTimeToTS(oracle.GetTimeFromTS(ts).Add(10 * time.Second))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest", `return(true)`))
|
|
require.NoError(t, failpoint.Enable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS",
|
|
fmt.Sprintf("return(%v)", injectSafeTS)))
|
|
|
|
timeBeforeDrop, _, safePointSQL, resetGC := MockGC(tk)
|
|
defer resetGC()
|
|
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
|
|
|
// Try canceled on StateDeleteOnly, cancel success
|
|
hook := newCancelJobHook(t, store, func(job *model.Job) bool {
|
|
return job.SchemaState == model.StateDeleteOnly
|
|
})
|
|
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterWaitSchemaSynced", hook.OnJobUpdated)
|
|
tk.MustExec("set global tidb_ttl_job_enable = on")
|
|
tk.MustGetErrCode(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)), errno.ErrCancelledDDLJob)
|
|
hook.MustCancelDone(t)
|
|
|
|
rs, err := tk.Exec("show variables like 'tidb_ttl_job_enable'")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.On)
|
|
|
|
// Try canceled on StateWriteReorganization, cancel failed
|
|
hook = newCancelJobHook(t, store, func(job *model.Job) bool {
|
|
return job.SchemaState == model.StateWriteReorganization
|
|
})
|
|
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/afterWaitSchemaSynced", hook.OnJobUpdated)
|
|
tk.MustExec(fmt.Sprintf("flashback cluster to timestamp '%s'", oracle.GetTimeFromTS(ts).Format(types.TimeFSPFormat)))
|
|
hook.MustCancelFailed(t)
|
|
|
|
rs, err = tk.Exec("show variables like 'tidb_ttl_job_enable'")
|
|
assert.NoError(t, err)
|
|
assert.Equal(t, tk.ResultSetToResult(rs, "").Rows()[0][1], vardef.Off)
|
|
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/mockFlashbackTest"))
|
|
require.NoError(t, failpoint.Disable("github.com/pingcap/tidb/pkg/ddl/injectSafeTS"))
|
|
}
|