Files
tidb/pkg/ddl/tests/adminpause/pause_cancel_test.go

236 lines
8.7 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package adminpause
import (
"context"
"fmt"
"strconv"
"sync/atomic"
"testing"
testddlutil "github.com/pingcap/tidb/pkg/ddl/testutil"
"github.com/pingcap/tidb/pkg/domain"
"github.com/pingcap/tidb/pkg/errno"
"github.com/pingcap/tidb/pkg/meta/model"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util/sqlexec"
"github.com/stretchr/testify/require"
"go.uber.org/zap"
)
// pauseAndCancelStmt pauses and cancel the `stmtCase`
// The variables cancelResultChn, pauseResultChn should not be out of the function domain because of parallel case
// execution
func pauseAndCancelStmt(t *testing.T, stmtKit *testkit.TestKit, adminCommandKit *testkit.TestKit, dom *domain.Domain, stmtCase *StmtCase) {
Logger.Info("pauseAndCancelStmt: case start,",
zap.Int("Global ID", stmtCase.globalID),
zap.String("Statement", stmtCase.stmt),
zap.String("Schema state", stmtCase.schemaState.String()),
zap.Strings("Pre-condition", stmtCase.preConditionStmts),
zap.Strings("Rollback statement", stmtCase.rollbackStmts),
zap.Bool("Job pausable", stmtCase.isJobPausable))
var jobID = &atomic.Int64{}
var isPaused = &atomic.Bool{}
var pauseResultChn = make(chan []sqlexec.RecordSet, 1)
var pauseErrChn = make(chan error, 1)
var pauseFunc = func(job *model.Job) {
Logger.Debug("pauseAndCancelStmt: OnJobRunBeforeExported, ",
zap.String("Job Type", job.Type.String()),
zap.String("Job State", job.State.String()),
zap.String("Job Schema State", job.SchemaState.String()),
zap.String("Expected Schema State", stmtCase.schemaState.String()))
// stmtCase is read-only among the whole test suite which is not necessary to be atomic
if testddlutil.MatchCancelState(t, job, stmtCase.schemaState, stmtCase.stmt) &&
stmtCase.isJobPausable && //
!isPaused.Load() {
jobID.Store(job.ID)
var pauseStmt = "admin pause ddl jobs " + strconv.FormatInt(jobID.Load(), 10)
var pr, pe = adminCommandKit.Session().Execute(context.Background(), pauseStmt)
pauseErrChn <- pe
pauseResultChn <- pr
Logger.Info("pauseAndCancelStmt: pause command by hook.OnJobRunBeforeExported, result to channel")
// In case that it runs into this scope again and again
isPaused.CompareAndSwap(false, true)
}
}
var verifyPauseResult = func(t *testing.T, adminCommandKit *testkit.TestKit) {
require.True(t, isPaused.Load())
pauseErr := <-pauseErrChn
pauseResult := <-pauseResultChn
require.NoError(t, pauseErr)
result := adminCommandKit.ResultSetToResultWithCtx(context.Background(),
pauseResult[0], "pause ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID.Load())))
isPaused.CompareAndSwap(true, false)
}
var isCancelled = &atomic.Bool{}
var cancelResultChn = make(chan []sqlexec.RecordSet, 1)
var cancelErrChn = make(chan error, 1)
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob", func(*model.Job) {
Logger.Debug("pauseAndCancelStmt: OnGetJobBeforeExported, ",
zap.String("Expected Schema State", stmtCase.schemaState.String()))
if isPaused.Load() && !isCancelled.Load() {
// Only the 'OnGetJobBeforeExported' hook works for `resume`, because the only job has been paused that
// it could not get into other hooks.
var cancelStmt = "admin cancel ddl jobs " + strconv.FormatInt(jobID.Load(), 10)
rr, re := adminCommandKit.Session().Execute(context.Background(), cancelStmt)
cancelErrChn <- re
cancelResultChn <- rr
Logger.Info("pauseAndCancelStmt: cancel command by hook.OnGetJobBeforeExported, result to channel")
isCancelled.CompareAndSwap(false, true) // In case that it runs into this scope again and again
}
})
var verifyCancelResult = func(t *testing.T, adminCommandKit *testkit.TestKit) {
require.True(t, isCancelled.Load())
cancelErr := <-cancelErrChn
cancelResult := <-cancelResultChn
require.NoError(t, cancelErr)
result := adminCommandKit.ResultSetToResultWithCtx(context.Background(),
cancelResult[0], "resume ddl job successfully")
result.Check(testkit.Rows(fmt.Sprintf("%d successful", jobID.Load())))
isCancelled.CompareAndSwap(true, false)
}
for _, prepareStmt := range stmtCase.preConditionStmts {
stmtKit.MustExec(prepareStmt)
}
testfailpoint.EnableCall(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep", pauseFunc)
isPaused.Store(false)
isCancelled.Store(false)
Logger.Debug("pauseAndCancelStmt: statement execute", zap.String("DDL Statement", stmtCase.stmt))
if stmtCase.isJobPausable {
stmtKit.MustGetErrCode(stmtCase.stmt, errno.ErrCancelledDDLJob)
Logger.Info("pauseAndCancelStmt: statement execution should have been cancelled.")
verifyPauseResult(t, adminCommandKit)
verifyCancelResult(t, adminCommandKit)
} else {
stmtKit.MustExec(stmtCase.stmt)
Logger.Info("pauseAndCancelStmt: statement execution should have been finished successfully.")
require.False(t, isPaused.Load())
require.False(t, isCancelled.Load())
}
// Release the hook, so that we could run the `rollbackStmts` successfully.
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRunOneJobStep")
testfailpoint.Disable(t, "github.com/pingcap/tidb/pkg/ddl/beforeRefreshJob")
for _, rollbackStmt := range stmtCase.rollbackStmts {
// no care about the result here, since the `statement` could have been cancelled OR finished successfully.
_, _ = stmtKit.Exec(rollbackStmt)
}
Logger.Info("pauseAndCancelStmt: statement case finished, ",
zap.String("Global ID", strconv.Itoa(stmtCase.globalID)))
}
func TestPauseCancelAndRerunSchemaStmt(t *testing.T) {
var dom, stmtKit, adminCommandKit = prepareDomain(t)
require.Nil(t, generateTblUser(stmtKit, 10))
for _, stmtCase := range schemaDDLStmtCase {
pauseAndCancelStmt(t, stmtKit, adminCommandKit, dom, &stmtCase)
Logger.Info("TestPauseCancelAndRerunSchemaStmt: statement execution again after `admin cancel`",
zap.String("DDL Statement", stmtCase.stmt))
stmtCase.simpleRunStmt(stmtKit)
}
for _, stmtCase := range tableDDLStmt {
pauseAndCancelStmt(t, stmtKit, adminCommandKit, dom, &stmtCase)
Logger.Info("TestPauseCancelAndRerunSchemaStmt: statement execution again after `admin cancel`",
zap.String("DDL Statement", stmtCase.stmt))
stmtCase.simpleRunStmt(stmtKit)
}
for _, stmtCase := range placeRulDDLStmtCase {
pauseAndCancelStmt(t, stmtKit, adminCommandKit, dom, &stmtCase)
Logger.Info("TestPauseCancelAndRerunSchemaStmt: statement execution again after `admin cancel`",
zap.String("DDL Statement", stmtCase.stmt))
stmtCase.simpleRunStmt(stmtKit)
}
}
func TestPauseCancelAndRerunIndexStmt(t *testing.T) {
var dom, stmtKit, adminCommandKit = prepareDomain(t)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/infoschema/mockTiFlashStoreCount", `return(true)`)
testfailpoint.Enable(t, "github.com/pingcap/tidb/pkg/ddl/MockCheckColumnarIndexProcess", `return(1)`)
require.Nil(t, generateTblUser(stmtKit, 10))
require.Nil(t, generateTblUserWithVec(stmtKit, 10))
for _, stmtCase := range indexDDLStmtCase {
pauseAndCancelStmt(t, stmtKit, adminCommandKit, dom, &stmtCase)
Logger.Info("TestPauseCancelAndRerunIndexStmt: statement execution again after `admin cancel`",
zap.String("DDL Statement", stmtCase.stmt))
stmtCase.simpleRunStmt(stmtKit)
}
}
func TestPauseCancelAndRerunColumnStmt(t *testing.T) {
var dom, stmtKit, adminCommandKit = prepareDomain(t)
require.Nil(t, generateTblUser(stmtKit, 10))
for _, stmtCase := range columnDDLStmtCase {
pauseAndCancelStmt(t, stmtKit, adminCommandKit, dom, &stmtCase)
Logger.Info("TestPauseCancelAndRerunColumnStmt: statement execution again after `admin cancel`",
zap.String("DDL Statement", stmtCase.stmt))
stmtCase.simpleRunStmt(stmtKit)
}
}
func TestPauseCancelAndRerunPartitionTableStmt(t *testing.T) {
var dom, stmtKit, adminCommandKit = prepareDomain(t)
require.Nil(t, generateTblUser(stmtKit, 0))
require.Nil(t, generateTblUserParition(stmtKit))
for _, stmtCase := range tablePartitionDDLStmtCase {
pauseAndCancelStmt(t, stmtKit, adminCommandKit, dom, &stmtCase)
Logger.Info("TestPauseCancelAndRerunPartitionTableStmt: statement execution again after `admin cancel`",
zap.String("DDL Statement", stmtCase.stmt))
stmtCase.simpleRunStmt(stmtKit)
}
}