ddl: add restore deleted table (#7937)
This commit is contained in:
@ -214,6 +214,7 @@ type DDL interface {
|
||||
CreateView(ctx sessionctx.Context, stmt *ast.CreateViewStmt) error
|
||||
CreateTableWithLike(ctx sessionctx.Context, ident, referIdent ast.Ident, ifNotExists bool) error
|
||||
DropTable(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
|
||||
RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error)
|
||||
DropView(ctx sessionctx.Context, tableIdent ast.Ident) (err error)
|
||||
CreateIndex(ctx sessionctx.Context, tableIdent ast.Ident, unique bool, indexName model.CIStr,
|
||||
columnNames []*ast.IndexColName, indexOption *ast.IndexOption) error
|
||||
|
||||
@ -1064,6 +1064,33 @@ func (d *ddl) CreateTable(ctx sessionctx.Context, s *ast.CreateTableStmt) (err e
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (d *ddl) RestoreTable(ctx sessionctx.Context, tbInfo *model.TableInfo, schemaID, autoID, dropJobID int64, snapshotTS uint64) (err error) {
|
||||
is := d.GetInformationSchema(ctx)
|
||||
// Check schema exist.
|
||||
schema, ok := is.SchemaByID(schemaID)
|
||||
if !ok {
|
||||
return errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Schema ID %d)", schemaID),
|
||||
))
|
||||
}
|
||||
// Check not exist table with same name.
|
||||
if ok := is.TableExists(schema.Name, tbInfo.Name); ok {
|
||||
return infoschema.ErrTableExists.GenWithStackByArgs(tbInfo.Name)
|
||||
}
|
||||
|
||||
tbInfo.State = model.StateNone
|
||||
job := &model.Job{
|
||||
SchemaID: schemaID,
|
||||
TableID: tbInfo.ID,
|
||||
Type: model.ActionRestoreTable,
|
||||
BinlogInfo: &model.HistoryInfo{},
|
||||
Args: []interface{}{tbInfo, autoID, dropJobID, snapshotTS, restoreTableCheckFlagNone},
|
||||
}
|
||||
err = d.doDDLJob(ctx, job)
|
||||
err = d.callHookOnChanged(err)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func (d *ddl) CreateView(ctx sessionctx.Context, s *ast.CreateViewStmt) (err error) {
|
||||
ident := ast.Ident{Name: s.ViewName.Name, Schema: s.ViewName.Schema}
|
||||
is := d.GetInformationSchema(ctx)
|
||||
|
||||
@ -277,9 +277,13 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
|
||||
case model.ActionDropSchema, model.ActionDropTable, model.ActionTruncateTable, model.ActionDropIndex, model.ActionDropTablePartition, model.ActionTruncateTablePartition:
|
||||
err = w.deleteRange(job)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
switch job.Type {
|
||||
case model.ActionRestoreTable:
|
||||
err = finishRestoreTable(w, t, job)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
_, err = t.DeQueueDDLJob()
|
||||
@ -293,6 +297,23 @@ func (w *worker) finishDDLJob(t *meta.Meta, job *model.Job) (err error) {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func finishRestoreTable(w *worker, t *meta.Meta, job *model.Job) error {
|
||||
tbInfo := &model.TableInfo{}
|
||||
var autoID, dropJobID, restoreTableCheckFlag int64
|
||||
var snapshotTS uint64
|
||||
err := job.DecodeArgs(tbInfo, &autoID, &dropJobID, &snapshotTS, &restoreTableCheckFlag)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if restoreTableCheckFlag == restoreTableCheckFlagEnableGC {
|
||||
err = enableGC(w)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func isDependencyJobDone(t *meta.Meta, job *model.Job) (bool, error) {
|
||||
if job.DependencyID == noneDependencyJob {
|
||||
return true, nil
|
||||
@ -497,6 +518,8 @@ func (w *worker) runDDLJob(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64,
|
||||
ver, err = onAddTablePartition(t, job)
|
||||
case model.ActionModifyTableCharsetAndCollate:
|
||||
ver, err = onModifyTableCharsetAndCollate(t, job)
|
||||
case model.ActionRestoreTable:
|
||||
ver, err = w.onRestoreTable(d, t, job)
|
||||
default:
|
||||
// Invalid job, cancel it.
|
||||
job.State = model.JobStateCancelled
|
||||
|
||||
@ -19,6 +19,7 @@ import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/parser/model"
|
||||
@ -38,9 +39,16 @@ const (
|
||||
delBackLog = 128
|
||||
)
|
||||
|
||||
// enableEmulatorGC means whether to enable emulator GC. The default is enable.
|
||||
// In some unit tests, we want to stop emulator GC, then wen can set enableEmulatorGC to 0.
|
||||
var emulatorGCEnable = int32(1)
|
||||
|
||||
type delRangeManager interface {
|
||||
// addDelRangeJob add a DDL job into gc_delete_range table.
|
||||
addDelRangeJob(job *model.Job) error
|
||||
// removeFromGCDeleteRange removes the deleting table job from gc_delete_range table by jobID and tableID.
|
||||
// It's use for recover the table that was mistakenly deleted.
|
||||
removeFromGCDeleteRange(jobID, tableID int64) error
|
||||
start()
|
||||
clear()
|
||||
}
|
||||
@ -90,6 +98,17 @@ func (dr *delRange) addDelRangeJob(job *model.Job) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeFromGCDeleteRange implements delRangeManager interface.
|
||||
func (dr *delRange) removeFromGCDeleteRange(jobID, tableID int64) error {
|
||||
ctx, err := dr.sessPool.get()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer dr.sessPool.put(ctx)
|
||||
err = util.RemoveFromGCDeleteRange(ctx, jobID, tableID)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// start implements delRangeManager interface.
|
||||
func (dr *delRange) start() {
|
||||
if !dr.storeSupport {
|
||||
@ -117,11 +136,28 @@ func (dr *delRange) startEmulator() {
|
||||
case <-dr.quitCh:
|
||||
return
|
||||
}
|
||||
err := dr.doDelRangeWork()
|
||||
terror.Log(errors.Trace(err))
|
||||
if IsEmulatorGCEnable() {
|
||||
err := dr.doDelRangeWork()
|
||||
terror.Log(errors.Trace(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// EmulatorGCEnable enables emulator gc. It exports for testing.
|
||||
func EmulatorGCEnable() {
|
||||
atomic.StoreInt32(&emulatorGCEnable, 1)
|
||||
}
|
||||
|
||||
// EmulatorGCDisable disables emulator gc. It exports for testing.
|
||||
func EmulatorGCDisable() {
|
||||
atomic.StoreInt32(&emulatorGCEnable, 0)
|
||||
}
|
||||
|
||||
// IsEmulatorGCEnable indicates whether emulator GC enabled. It exports for testing.
|
||||
func IsEmulatorGCEnable() bool {
|
||||
return atomic.LoadInt32(&emulatorGCEnable) == 1
|
||||
}
|
||||
|
||||
func (dr *delRange) doDelRangeWork() error {
|
||||
ctx, err := dr.sessPool.get()
|
||||
if err != nil {
|
||||
|
||||
@ -126,6 +126,11 @@ func (dr *mockDelRange) addDelRangeJob(job *model.Job) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// removeFromGCDeleteRange implements delRangeManager interface.
|
||||
func (dr *mockDelRange) removeFromGCDeleteRange(jobID, tableID int64) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// start implements delRangeManager interface.
|
||||
func (dr *mockDelRange) start() {
|
||||
return
|
||||
|
||||
@ -15,6 +15,8 @@ package ddl_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
. "github.com/pingcap/check"
|
||||
@ -23,10 +25,12 @@ import (
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/tidb/ddl"
|
||||
"github.com/pingcap/tidb/domain"
|
||||
"github.com/pingcap/tidb/infoschema"
|
||||
"github.com/pingcap/tidb/kv"
|
||||
"github.com/pingcap/tidb/session"
|
||||
"github.com/pingcap/tidb/store/mockstore"
|
||||
"github.com/pingcap/tidb/util/admin"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
"github.com/pingcap/tidb/util/mock"
|
||||
"github.com/pingcap/tidb/util/testkit"
|
||||
"github.com/pingcap/tidb/util/testleak"
|
||||
@ -123,3 +127,337 @@ func (s *testSerialSuite) TestCancelAddIndexPanic(c *C) {
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(err.Error(), Equals, "[ddl:12]cancelled DDL job")
|
||||
}
|
||||
|
||||
func (s *testSerialSuite) TestRestoreTableByJobID(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("create database if not exists test_restore")
|
||||
tk.MustExec("use test_restore")
|
||||
tk.MustExec("drop table if exists t_recover")
|
||||
tk.MustExec("create table t_recover (a int);")
|
||||
defer func(originGC bool) {
|
||||
if originGC {
|
||||
ddl.EmulatorGCEnable()
|
||||
} else {
|
||||
ddl.EmulatorGCDisable()
|
||||
}
|
||||
}(ddl.IsEmulatorGCEnable())
|
||||
|
||||
// disable emulator GC.
|
||||
// Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl.
|
||||
ddl.EmulatorGCDisable()
|
||||
gcTimeFormat := "20060102-15:04:05 -0700 MST"
|
||||
timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat)
|
||||
timeAfterDrop := time.Now().Add(time.Duration(48 * 60 * 60 * time.Second)).Format(gcTimeFormat)
|
||||
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
|
||||
ON DUPLICATE KEY
|
||||
UPDATE variable_value = '%[1]s'`
|
||||
// clear GC variables first.
|
||||
tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )")
|
||||
|
||||
tk.MustExec("insert into t_recover values (1),(2),(3)")
|
||||
tk.MustExec("drop table t_recover")
|
||||
|
||||
rs, err := tk.Exec("admin show ddl jobs")
|
||||
c.Assert(err, IsNil)
|
||||
rows, err := session.GetRows4Test(context.Background(), tk.Se, rs)
|
||||
c.Assert(err, IsNil)
|
||||
row := rows[0]
|
||||
c.Assert(row.GetString(1), Equals, "test_restore")
|
||||
c.Assert(row.GetString(3), Equals, "drop table")
|
||||
jobID := row.GetInt64(0)
|
||||
|
||||
// if GC safe point is not exists in mysql.tidb
|
||||
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID))
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(err.Error(), Equals, "can not get 'tikv_gc_safe_point'")
|
||||
// set GC safe point
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
||||
|
||||
// if GC enable is not exists in mysql.tidb
|
||||
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID))
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'")
|
||||
|
||||
err = gcutil.EnableGC(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// recover job is before GC safe point
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeAfterDrop))
|
||||
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID))
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(strings.Contains(err.Error(), "snapshot is older than GC safe point"), Equals, true)
|
||||
|
||||
// set GC safe point
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
||||
// if there is a new table with the same name, should return failed.
|
||||
tk.MustExec("create table t_recover (a int);")
|
||||
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", jobID))
|
||||
c.Assert(err.Error(), Equals, infoschema.ErrTableExists.GenWithStackByArgs("t_recover").Error())
|
||||
|
||||
// drop the new table with the same name, then restore table.
|
||||
tk.MustExec("drop table t_recover")
|
||||
|
||||
// do restore table.
|
||||
tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID))
|
||||
|
||||
// check recover table meta and data record.
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3"))
|
||||
// check recover table autoID.
|
||||
tk.MustExec("insert into t_recover values (4),(5),(6)")
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
|
||||
|
||||
// restore table by none exits job.
|
||||
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", 10000000))
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
// Disable GC by manual first, then after recover table, the GC enable status should also be disabled.
|
||||
err = gcutil.DisableGC(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
tk.MustExec("delete from t_recover where a > 1")
|
||||
tk.MustExec("drop table t_recover")
|
||||
rs, err = tk.Exec("admin show ddl jobs")
|
||||
c.Assert(err, IsNil)
|
||||
rows, err = session.GetRows4Test(context.Background(), tk.Se, rs)
|
||||
c.Assert(err, IsNil)
|
||||
row = rows[0]
|
||||
c.Assert(row.GetString(1), Equals, "test_restore")
|
||||
c.Assert(row.GetString(3), Equals, "drop table")
|
||||
jobID = row.GetInt64(0)
|
||||
|
||||
tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID))
|
||||
|
||||
// check recover table meta and data record.
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1"))
|
||||
// check recover table autoID.
|
||||
tk.MustExec("insert into t_recover values (7),(8),(9)")
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "7", "8", "9"))
|
||||
|
||||
gcEnable, err := gcutil.CheckGCEnable(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(gcEnable, Equals, false)
|
||||
}
|
||||
|
||||
func (s *testSerialSuite) TestRestoreTableByTableName(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("create database if not exists test_restore")
|
||||
tk.MustExec("use test_restore")
|
||||
tk.MustExec("drop table if exists t_recover, t_recover2")
|
||||
tk.MustExec("create table t_recover (a int);")
|
||||
defer func(originGC bool) {
|
||||
if originGC {
|
||||
ddl.EmulatorGCEnable()
|
||||
} else {
|
||||
ddl.EmulatorGCDisable()
|
||||
}
|
||||
}(ddl.IsEmulatorGCEnable())
|
||||
|
||||
// disable emulator GC.
|
||||
// Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl.
|
||||
ddl.EmulatorGCDisable()
|
||||
gcTimeFormat := "20060102-15:04:05 -0700 MST"
|
||||
timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat)
|
||||
timeAfterDrop := time.Now().Add(time.Duration(48 * 60 * 60 * time.Second)).Format(gcTimeFormat)
|
||||
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
|
||||
ON DUPLICATE KEY
|
||||
UPDATE variable_value = '%[1]s'`
|
||||
// clear GC variables first.
|
||||
tk.MustExec("delete from mysql.tidb where variable_name in ( 'tikv_gc_safe_point','tikv_gc_enable' )")
|
||||
|
||||
tk.MustExec("insert into t_recover values (1),(2),(3)")
|
||||
tk.MustExec("drop table t_recover")
|
||||
|
||||
// if GC safe point is not exists in mysql.tidb
|
||||
_, err := tk.Exec("admin restore table t_recover")
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(err.Error(), Equals, "can not get 'tikv_gc_safe_point'")
|
||||
// set GC safe point
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
||||
|
||||
// if GC enable is not exists in mysql.tidb
|
||||
_, err = tk.Exec("admin restore table t_recover")
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(err.Error(), Equals, "[ddl:-1]can not get 'tikv_gc_enable'")
|
||||
|
||||
err = gcutil.EnableGC(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
// recover job is before GC safe point
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeAfterDrop))
|
||||
_, err = tk.Exec("admin restore table t_recover")
|
||||
c.Assert(err, NotNil)
|
||||
c.Assert(strings.Contains(err.Error(), "snapshot is older than GC safe point"), Equals, true)
|
||||
|
||||
// set GC safe point
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
||||
// if there is a new table with the same name, should return failed.
|
||||
tk.MustExec("create table t_recover (a int);")
|
||||
_, err = tk.Exec("admin restore table t_recover")
|
||||
c.Assert(err.Error(), Equals, infoschema.ErrTableExists.GenWithStackByArgs("t_recover").Error())
|
||||
|
||||
// drop the new table with the same name, then restore table.
|
||||
tk.MustExec("rename table t_recover to t_recover2")
|
||||
|
||||
// do restore table.
|
||||
tk.MustExec("admin restore table t_recover")
|
||||
|
||||
// check recover table meta and data record.
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3"))
|
||||
// check recover table autoID.
|
||||
tk.MustExec("insert into t_recover values (4),(5),(6)")
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
|
||||
// check rebase auto id.
|
||||
tk.MustQuery("select a,_tidb_rowid from t_recover;").Check(testkit.Rows("1 1", "2 2", "3 3", "4 5001", "5 5002", "6 5003"))
|
||||
|
||||
// restore table by none exits job.
|
||||
_, err = tk.Exec(fmt.Sprintf("admin restore table by job %d", 10000000))
|
||||
c.Assert(err, NotNil)
|
||||
|
||||
// Disable GC by manual first, then after recover table, the GC enable status should also be disabled.
|
||||
err = gcutil.DisableGC(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
|
||||
tk.MustExec("delete from t_recover where a > 1")
|
||||
tk.MustExec("drop table t_recover")
|
||||
|
||||
tk.MustExec("admin restore table t_recover")
|
||||
|
||||
// check recover table meta and data record.
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1"))
|
||||
// check recover table autoID.
|
||||
tk.MustExec("insert into t_recover values (7),(8),(9)")
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "7", "8", "9"))
|
||||
|
||||
gcEnable, err := gcutil.CheckGCEnable(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(gcEnable, Equals, false)
|
||||
}
|
||||
|
||||
func (s *testSerialSuite) TestRestoreTableByJobIDFail(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("create database if not exists test_restore")
|
||||
tk.MustExec("use test_restore")
|
||||
tk.MustExec("drop table if exists t_recover")
|
||||
tk.MustExec("create table t_recover (a int);")
|
||||
defer func(originGC bool) {
|
||||
if originGC {
|
||||
ddl.EmulatorGCEnable()
|
||||
} else {
|
||||
ddl.EmulatorGCDisable()
|
||||
}
|
||||
}(ddl.IsEmulatorGCEnable())
|
||||
|
||||
// disable emulator GC.
|
||||
// Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl.
|
||||
ddl.EmulatorGCDisable()
|
||||
gcTimeFormat := "20060102-15:04:05 -0700 MST"
|
||||
timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat)
|
||||
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
|
||||
ON DUPLICATE KEY
|
||||
UPDATE variable_value = '%[1]s'`
|
||||
|
||||
tk.MustExec("insert into t_recover values (1),(2),(3)")
|
||||
tk.MustExec("drop table t_recover")
|
||||
|
||||
rs, err := tk.Exec("admin show ddl jobs")
|
||||
c.Assert(err, IsNil)
|
||||
rows, err := session.GetRows4Test(context.Background(), tk.Se, rs)
|
||||
c.Assert(err, IsNil)
|
||||
row := rows[0]
|
||||
c.Assert(row.GetString(1), Equals, "test_restore")
|
||||
c.Assert(row.GetString(3), Equals, "drop table")
|
||||
jobID := row.GetInt64(0)
|
||||
|
||||
// enableGC first
|
||||
err = gcutil.EnableGC(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
||||
|
||||
// set hook
|
||||
hook := &ddl.TestDDLCallback{}
|
||||
hook.OnJobRunBeforeExported = func(job *model.Job) {
|
||||
if job.Type == model.ActionRestoreTable {
|
||||
gofail.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`)
|
||||
gofail.Enable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr", `return(true)`)
|
||||
}
|
||||
}
|
||||
origHook := s.dom.DDL().GetHook()
|
||||
defer s.dom.DDL().(ddl.DDLForTest).SetHook(origHook)
|
||||
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
|
||||
|
||||
// do restore table.
|
||||
tk.MustExec(fmt.Sprintf("admin restore table by job %d", jobID))
|
||||
gofail.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError")
|
||||
gofail.Disable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr")
|
||||
|
||||
// make sure enable GC after restore table.
|
||||
enable, err := gcutil.CheckGCEnable(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(enable, Equals, true)
|
||||
|
||||
// check recover table meta and data record.
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3"))
|
||||
// check recover table autoID.
|
||||
tk.MustExec("insert into t_recover values (4),(5),(6)")
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
|
||||
}
|
||||
|
||||
func (s *testSerialSuite) TestRestoreTableByTableNameFail(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
tk.MustExec("create database if not exists test_restore")
|
||||
tk.MustExec("use test_restore")
|
||||
tk.MustExec("drop table if exists t_recover")
|
||||
tk.MustExec("create table t_recover (a int);")
|
||||
defer func(originGC bool) {
|
||||
if originGC {
|
||||
ddl.EmulatorGCEnable()
|
||||
} else {
|
||||
ddl.EmulatorGCDisable()
|
||||
}
|
||||
}(ddl.IsEmulatorGCEnable())
|
||||
|
||||
// disable emulator GC.
|
||||
// Otherwise emulator GC will delete table record as soon as possible after execute drop table ddl.
|
||||
ddl.EmulatorGCDisable()
|
||||
gcTimeFormat := "20060102-15:04:05 -0700 MST"
|
||||
timeBeforeDrop := time.Now().Add(0 - time.Duration(48*60*60*time.Second)).Format(gcTimeFormat)
|
||||
safePointSQL := `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('tikv_gc_safe_point', '%[1]s', '')
|
||||
ON DUPLICATE KEY
|
||||
UPDATE variable_value = '%[1]s'`
|
||||
|
||||
tk.MustExec("insert into t_recover values (1),(2),(3)")
|
||||
tk.MustExec("drop table t_recover")
|
||||
|
||||
// enableGC first
|
||||
err := gcutil.EnableGC(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
tk.MustExec(fmt.Sprintf(safePointSQL, timeBeforeDrop))
|
||||
|
||||
// set hook
|
||||
hook := &ddl.TestDDLCallback{}
|
||||
hook.OnJobRunBeforeExported = func(job *model.Job) {
|
||||
if job.Type == model.ActionRestoreTable {
|
||||
gofail.Enable("github.com/pingcap/tidb/store/tikv/mockCommitError", `return(true)`)
|
||||
gofail.Enable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr", `return(true)`)
|
||||
}
|
||||
}
|
||||
origHook := s.dom.DDL().GetHook()
|
||||
defer s.dom.DDL().(ddl.DDLForTest).SetHook(origHook)
|
||||
s.dom.DDL().(ddl.DDLForTest).SetHook(hook)
|
||||
|
||||
// do restore table.
|
||||
tk.MustExec("admin restore table t_recover")
|
||||
gofail.Disable("github.com/pingcap/tidb/store/tikv/mockCommitError")
|
||||
gofail.Disable("github.com/pingcap/tidb/ddl/mockRestoreTableCommitErr")
|
||||
|
||||
// make sure enable GC after restore table.
|
||||
enable, err := gcutil.CheckGCEnable(tk.Se)
|
||||
c.Assert(err, IsNil)
|
||||
c.Assert(enable, Equals, true)
|
||||
|
||||
// check recover table meta and data record.
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3"))
|
||||
// check recover table autoID.
|
||||
tk.MustExec("insert into t_recover values (4),(5),(6)")
|
||||
tk.MustQuery("select * from t_recover;").Check(testkit.Rows("1", "2", "3", "4", "5", "6"))
|
||||
}
|
||||
|
||||
150
ddl/table.go
150
ddl/table.go
@ -28,6 +28,7 @@ import (
|
||||
"github.com/pingcap/tidb/meta/autoid"
|
||||
"github.com/pingcap/tidb/table"
|
||||
"github.com/pingcap/tidb/tablecodec"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -147,6 +148,155 @@ func onDropTableOrView(t *meta.Meta, job *model.Job) (ver int64, _ error) {
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
const (
|
||||
restoreTableCheckFlagNone int64 = iota
|
||||
restoreTableCheckFlagEnableGC
|
||||
restoreTableCheckFlagDisableGC
|
||||
)
|
||||
|
||||
func (w *worker) onRestoreTable(d *ddlCtx, t *meta.Meta, job *model.Job) (ver int64, err error) {
|
||||
schemaID := job.SchemaID
|
||||
tblInfo := &model.TableInfo{}
|
||||
var autoID, dropJobID, restoreTableCheckFlag int64
|
||||
var snapshotTS uint64
|
||||
if err = job.DecodeArgs(tblInfo, &autoID, &dropJobID, &snapshotTS, &restoreTableCheckFlag); err != nil {
|
||||
// Invalid arguments, cancel this job.
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
// check GC and safe point
|
||||
gcEnable, err := checkGCEnable(w)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
err = checkTableNotExists(t, job, schemaID, tblInfo.Name.L)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
// Restore table divide into 2 steps:
|
||||
// 1. Check GC enable status, to decided whether enable GC after restore table.
|
||||
// a. Why not disable GC before put the job to DDL job queue?
|
||||
// Think about concurrency problem. If a restore job-1 is doing and already disabled GC,
|
||||
// then, another restore table job-2 check GC enable will get disable before into the job queue.
|
||||
// then, after restore table job-2 finished, the GC will be disabled.
|
||||
// b. Why split into 2 steps? 1 step also can finish this job: check GC -> disable GC -> restore table -> finish job.
|
||||
// What if the transaction commit failed? then, the job will retry, but the GC already disabled when first running.
|
||||
// So, after this job retry succeed, the GC will be disabled.
|
||||
// 2. Do restore table job.
|
||||
// a. Check whether GC enabled, if enabled, disable GC first.
|
||||
// b. Check GC safe point. If drop table time if after safe point time, then can do restore.
|
||||
// otherwise, can't restore table, because the records of the table may already delete by gc.
|
||||
// c. Remove GC task of the table from gc_delete_range table.
|
||||
// d. Create table and rebase table auto ID.
|
||||
// e. Finish.
|
||||
switch tblInfo.State {
|
||||
case model.StateNone:
|
||||
// none -> write only
|
||||
// check GC enable and update flag.
|
||||
if gcEnable {
|
||||
job.Args[len(job.Args)-1] = restoreTableCheckFlagEnableGC
|
||||
} else {
|
||||
job.Args[len(job.Args)-1] = restoreTableCheckFlagDisableGC
|
||||
}
|
||||
|
||||
job.SchemaState = model.StateWriteOnly
|
||||
tblInfo.State = model.StateWriteOnly
|
||||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, false)
|
||||
case model.StateWriteOnly:
|
||||
// write only -> public
|
||||
// do restore table.
|
||||
if gcEnable {
|
||||
err = disableGC(w)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Errorf("disable gc failed, try again later. err: %v", err)
|
||||
}
|
||||
}
|
||||
// check GC safe point
|
||||
err = checkSafePoint(w, snapshotTS)
|
||||
if err != nil {
|
||||
job.State = model.JobStateCancelled
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
// Remove dropped table DDL job from gc_delete_range table.
|
||||
err = w.delRangeManager.removeFromGCDeleteRange(dropJobID, tblInfo.ID)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
tblInfo.State = model.StatePublic
|
||||
tblInfo.UpdateTS = t.StartTS
|
||||
err = t.CreateTableAndSetAutoID(schemaID, tblInfo, autoID)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
// gofail: var mockRestoreTableCommitErr bool
|
||||
// if mockRestoreTableCommitErr && mockRestoreTableCommitErrOnce {
|
||||
// mockRestoreTableCommitErrOnce = false
|
||||
// kv.MockCommitErrorEnable()
|
||||
// }
|
||||
|
||||
ver, err = updateVersionAndTableInfo(t, job, tblInfo, true)
|
||||
if err != nil {
|
||||
return ver, errors.Trace(err)
|
||||
}
|
||||
|
||||
// Finish this job.
|
||||
job.FinishTableJob(model.JobStateDone, model.StatePublic, ver, tblInfo)
|
||||
default:
|
||||
return ver, ErrInvalidTableState.GenWithStack("invalid restore table state %v", tblInfo.State)
|
||||
}
|
||||
return ver, nil
|
||||
}
|
||||
|
||||
// mockRestoreTableCommitErrOnce uses to make sure `mockRestoreTableCommitErr` only mock error once.
|
||||
var mockRestoreTableCommitErrOnce = true
|
||||
|
||||
func enableGC(w *worker) error {
|
||||
ctx, err := w.sessPool.get()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer w.sessPool.put(ctx)
|
||||
|
||||
return gcutil.EnableGC(ctx)
|
||||
}
|
||||
|
||||
func disableGC(w *worker) error {
|
||||
ctx, err := w.sessPool.get()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer w.sessPool.put(ctx)
|
||||
|
||||
return gcutil.DisableGC(ctx)
|
||||
}
|
||||
|
||||
func checkGCEnable(w *worker) (enable bool, err error) {
|
||||
ctx, err := w.sessPool.get()
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
defer w.sessPool.put(ctx)
|
||||
|
||||
return gcutil.CheckGCEnable(ctx)
|
||||
}
|
||||
|
||||
func checkSafePoint(w *worker, snapshotTS uint64) error {
|
||||
ctx, err := w.sessPool.get()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer w.sessPool.put(ctx)
|
||||
|
||||
return gcutil.ValidateSnapshot(ctx, snapshotTS)
|
||||
}
|
||||
|
||||
type splitableStore interface {
|
||||
SplitRegion(splitKey kv.Key) error
|
||||
}
|
||||
|
||||
@ -109,8 +109,13 @@ func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
sql = fmt.Sprintf(completeDeleteRangeSQL, dr.JobID, dr.ElementID)
|
||||
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
|
||||
return RemoveFromGCDeleteRange(ctx, dr.JobID, dr.ElementID)
|
||||
}
|
||||
|
||||
// RemoveFromGCDeleteRange is exported for ddl pkg to use.
|
||||
func RemoveFromGCDeleteRange(ctx sessionctx.Context, jobID, elementID int64) error {
|
||||
sql := fmt.Sprintf(completeDeleteRangeSQL, jobID, elementID)
|
||||
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
|
||||
@ -255,6 +255,15 @@ func (do *Domain) GetSnapshotInfoSchema(snapshotTS uint64) (infoschema.InfoSchem
|
||||
return snapHandle.Get(), nil
|
||||
}
|
||||
|
||||
// GetSnapshotMeta gets a new snapshot meta at startTS.
|
||||
func (do *Domain) GetSnapshotMeta(startTS uint64) (*meta.Meta, error) {
|
||||
snapshot, err := do.store.GetSnapshot(kv.NewVersion(startTS))
|
||||
if err != nil {
|
||||
return nil, errors.Trace(err)
|
||||
}
|
||||
return meta.NewSnapshotMeta(snapshot), nil
|
||||
}
|
||||
|
||||
// DDL gets DDL from domain.
|
||||
func (do *Domain) DDL() ddl.DDL {
|
||||
return do.ddl
|
||||
|
||||
@ -83,6 +83,8 @@ func (b *executorBuilder) build(p plannercore.Plan) Executor {
|
||||
return b.buildCheckIndexRange(v)
|
||||
case *plannercore.ChecksumTable:
|
||||
return b.buildChecksumTable(v)
|
||||
case *plannercore.RestoreTable:
|
||||
return b.buildRestoreTable(v)
|
||||
case *plannercore.DDL:
|
||||
return b.buildDDL(v)
|
||||
case *plannercore.Deallocate:
|
||||
@ -337,6 +339,16 @@ func (b *executorBuilder) buildRecoverIndex(v *plannercore.RecoverIndex) Executo
|
||||
return e
|
||||
}
|
||||
|
||||
func (b *executorBuilder) buildRestoreTable(v *plannercore.RestoreTable) Executor {
|
||||
e := &RestoreTableExec{
|
||||
baseExecutor: newBaseExecutor(b.ctx, v.Schema(), v.ExplainID()),
|
||||
jobID: v.JobID,
|
||||
Table: v.Table,
|
||||
JobNum: v.JobNum,
|
||||
}
|
||||
return e
|
||||
}
|
||||
|
||||
func buildCleanupIndexCols(tblInfo *model.TableInfo, indexInfo *model.IndexInfo) []*model.ColumnInfo {
|
||||
columns := make([]*model.ColumnInfo, 0, len(indexInfo.Columns)+1)
|
||||
for _, idxCol := range indexInfo.Columns {
|
||||
|
||||
172
executor/ddl.go
172
executor/ddl.go
@ -25,9 +25,13 @@ import (
|
||||
"github.com/pingcap/tidb/config"
|
||||
"github.com/pingcap/tidb/domain"
|
||||
"github.com/pingcap/tidb/infoschema"
|
||||
"github.com/pingcap/tidb/meta"
|
||||
"github.com/pingcap/tidb/planner/core"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util/admin"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
@ -287,3 +291,171 @@ func (e *DDLExec) executeAlterTable(s *ast.AlterTableStmt) error {
|
||||
err := domain.GetDomain(e.ctx).DDL().AlterTable(e.ctx, ti, s.Specs)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// RestoreTableExec represents a recover table executor.
|
||||
// It is built from "admin restore table by job" statement,
|
||||
// is used to recover the table that deleted by mistake.
|
||||
type RestoreTableExec struct {
|
||||
baseExecutor
|
||||
jobID int64
|
||||
Table *ast.TableName
|
||||
JobNum int64
|
||||
}
|
||||
|
||||
// Open implements the Executor Open interface.
|
||||
func (e *RestoreTableExec) Open(ctx context.Context) error {
|
||||
if err := e.baseExecutor.Open(ctx); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Next implements the Executor Open interface.
|
||||
func (e *RestoreTableExec) Next(ctx context.Context, req *chunk.RecordBatch) (err error) {
|
||||
// Should commit the previous transaction and create a new transaction.
|
||||
if err = e.ctx.NewTxn(ctx); err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
defer func() { e.ctx.GetSessionVars().StmtCtx.IsDDLJobInQueue = false }()
|
||||
|
||||
err = e.executeRestoreTable()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
dom := domain.GetDomain(e.ctx)
|
||||
// Update InfoSchema in TxnCtx, so it will pass schema check.
|
||||
is := dom.InfoSchema()
|
||||
txnCtx := e.ctx.GetSessionVars().TxnCtx
|
||||
txnCtx.InfoSchema = is
|
||||
txnCtx.SchemaVersion = is.SchemaMetaVersion()
|
||||
// DDL will force commit old transaction, after DDL, in transaction status should be false.
|
||||
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *RestoreTableExec) executeRestoreTable() error {
|
||||
txn, err := e.ctx.Txn(true)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
t := meta.NewMeta(txn)
|
||||
dom := domain.GetDomain(e.ctx)
|
||||
var job *model.Job
|
||||
var tblInfo *model.TableInfo
|
||||
if e.jobID != 0 {
|
||||
job, tblInfo, err = getRestoreTableByJobID(e, t, dom)
|
||||
} else {
|
||||
job, tblInfo, err = getRestoreTableByTableName(e, t, dom)
|
||||
}
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
// Get table original autoID before table drop.
|
||||
m, err := dom.GetSnapshotMeta(job.StartTS)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
autoID, err := m.GetAutoTableID(job.SchemaID, job.TableID)
|
||||
if err != nil {
|
||||
return errors.Errorf("recover table_id: %d, get original autoID from snapshot meta err: %s", job.TableID, err.Error())
|
||||
}
|
||||
// Call DDL RestoreTable
|
||||
err = domain.GetDomain(e.ctx).DDL().RestoreTable(e.ctx, tblInfo, job.SchemaID, autoID, job.ID, job.StartTS)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
func getRestoreTableByJobID(e *RestoreTableExec, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
|
||||
job, err := t.GetHistoryDDLJob(e.jobID)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
if job == nil {
|
||||
return nil, nil, admin.ErrDDLJobNotFound.GenWithStackByArgs(e.jobID)
|
||||
}
|
||||
if job.Type != model.ActionDropTable {
|
||||
return nil, nil, errors.Errorf("Job %v type is %v, not drop table", job.ID, job.Type)
|
||||
}
|
||||
|
||||
// Check GC safe point for getting snapshot infoSchema.
|
||||
err = gcutil.ValidateSnapshot(e.ctx, job.StartTS)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
|
||||
// Get the snapshot infoSchema before drop table.
|
||||
snapInfo, err := dom.GetSnapshotInfoSchema(job.StartTS)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
// Get table meta from snapshot infoSchema.
|
||||
table, ok := snapInfo.TableByID(job.TableID)
|
||||
if !ok {
|
||||
return nil, nil, infoschema.ErrTableNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Schema ID %d)", job.SchemaID),
|
||||
fmt.Sprintf("(Table ID %d)", job.TableID),
|
||||
)
|
||||
}
|
||||
return job, table.Meta(), nil
|
||||
}
|
||||
|
||||
func getRestoreTableByTableName(e *RestoreTableExec, t *meta.Meta, dom *domain.Domain) (*model.Job, *model.TableInfo, error) {
|
||||
jobs, err := t.GetAllHistoryDDLJobs()
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
var job *model.Job
|
||||
var tblInfo *model.TableInfo
|
||||
gcSafePoint, err := gcutil.GetGCSafePoint(e.ctx)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
schemaName := e.Table.Schema.L
|
||||
if schemaName == "" {
|
||||
schemaName = e.ctx.GetSessionVars().CurrentDB
|
||||
}
|
||||
if schemaName == "" {
|
||||
return nil, nil, errors.Trace(core.ErrNoDB)
|
||||
}
|
||||
// TODO: only search recent `e.JobNum` DDL jobs.
|
||||
for i := len(jobs) - 1; i > 0; i-- {
|
||||
job = jobs[i]
|
||||
if job.Type != model.ActionDropTable {
|
||||
continue
|
||||
}
|
||||
// Check GC safe point for getting snapshot infoSchema.
|
||||
err = gcutil.ValidateSnapshotWithGCSafePoint(job.StartTS, gcSafePoint)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
// Get the snapshot infoSchema before drop table.
|
||||
snapInfo, err := dom.GetSnapshotInfoSchema(job.StartTS)
|
||||
if err != nil {
|
||||
return nil, nil, errors.Trace(err)
|
||||
}
|
||||
// Get table meta from snapshot infoSchema.
|
||||
table, ok := snapInfo.TableByID(job.TableID)
|
||||
if !ok {
|
||||
return nil, nil, infoschema.ErrTableNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Schema ID %d)", job.SchemaID),
|
||||
fmt.Sprintf("(Table ID %d)", job.TableID),
|
||||
)
|
||||
}
|
||||
if table.Meta().Name.L == e.Table.Name.L {
|
||||
schema, ok := dom.InfoSchema().SchemaByID(job.SchemaID)
|
||||
if !ok {
|
||||
return nil, nil, errors.Trace(infoschema.ErrDatabaseNotExists.GenWithStackByArgs(
|
||||
fmt.Sprintf("(Schema ID %d)", job.SchemaID),
|
||||
))
|
||||
}
|
||||
if schema.Name.L == schemaName {
|
||||
tblInfo = table.Meta()
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
if tblInfo == nil {
|
||||
return nil, nil, errors.Errorf("Can't found drop table: %v in ddl history jobs", e.Table.Name)
|
||||
}
|
||||
return job, tblInfo, nil
|
||||
}
|
||||
|
||||
@ -577,7 +577,4 @@ func (s *testSuite3) TestSetDDLReorgBatchSize(c *C) {
|
||||
tk.MustExec("set @@global.tidb_ddl_reorg_batch_size = 1000")
|
||||
res = tk.MustQuery("select @@global.tidb_ddl_reorg_batch_size")
|
||||
res.Check(testkit.Rows("1000"))
|
||||
|
||||
// If do not LoadDDLReorgVars, the local variable will be the last loaded value.
|
||||
c.Assert(variable.GetDDLReorgBatchSize(), Equals, int32(100))
|
||||
}
|
||||
|
||||
@ -24,12 +24,10 @@ import (
|
||||
"github.com/pingcap/parser/terror"
|
||||
"github.com/pingcap/tidb/domain"
|
||||
"github.com/pingcap/tidb/expression"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/types"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/chunk"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
"github.com/pingcap/tidb/util/gcutil"
|
||||
log "github.com/sirupsen/logrus"
|
||||
)
|
||||
|
||||
@ -158,7 +156,7 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
|
||||
}
|
||||
newSnapshotIsSet := sessionVars.SnapshotTS > 0 && sessionVars.SnapshotTS != oldSnapshotTS
|
||||
if newSnapshotIsSet {
|
||||
err = validateSnapshot(e.ctx, sessionVars.SnapshotTS)
|
||||
err = gcutil.ValidateSnapshot(e.ctx, sessionVars.SnapshotTS)
|
||||
if err != nil {
|
||||
sessionVars.SnapshotTS = oldSnapshotTS
|
||||
return errors.Trace(err)
|
||||
@ -183,28 +181,6 @@ func (e *SetExecutor) setSysVariable(name string, v *expression.VarAssignment) e
|
||||
return nil
|
||||
}
|
||||
|
||||
// validateSnapshot checks that the newly set snapshot time is after GC safe point time.
|
||||
func validateSnapshot(ctx sessionctx.Context, snapshotTS uint64) error {
|
||||
sql := "SELECT variable_value FROM mysql.tidb WHERE variable_name = 'tikv_gc_safe_point'"
|
||||
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
return errors.New("can not get 'tikv_gc_safe_point'")
|
||||
}
|
||||
safePointString := rows[0].GetString(0)
|
||||
safePointTime, err := util.CompatibleParseGCTime(safePointString)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
safePointTS := variable.GoTimeToTS(safePointTime)
|
||||
if safePointTS > snapshotTS {
|
||||
return variable.ErrSnapshotTooOld.GenWithStackByArgs(safePointString)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *SetExecutor) setCharset(cs, co string) error {
|
||||
var err error
|
||||
if len(co) == 0 {
|
||||
|
||||
2
go.mod
2
go.mod
@ -48,7 +48,7 @@ require (
|
||||
github.com/pingcap/gofail v0.0.0-20181217135706-6a951c1e42c3
|
||||
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e
|
||||
github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379
|
||||
github.com/pingcap/parser v0.0.0-20190114091152-8b799d66df61
|
||||
github.com/pingcap/parser v0.0.0-20190114105451-005df5698910
|
||||
github.com/pingcap/pd v2.1.0-rc.4+incompatible
|
||||
github.com/pingcap/tidb-tools v2.1.3-0.20190104033906-883b07a04a73+incompatible
|
||||
github.com/pingcap/tipb v0.0.0-20181012112600-11e33c750323
|
||||
|
||||
4
go.sum
4
go.sum
@ -110,8 +110,8 @@ github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e h1:P73/4dPCL96rG
|
||||
github.com/pingcap/goleveldb v0.0.0-20171020122428-b9ff6c35079e/go.mod h1:O17XtbryoCJhkKGbT62+L2OlrniwqiGLSqrmdHCMzZw=
|
||||
github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379 h1:l4KInBOtxjbgQLjCFHzX66vZgNzsH4a+RiuVZGrO0xk=
|
||||
github.com/pingcap/kvproto v0.0.0-20190110035000-d4fe6b336379/go.mod h1:QMdbTAXCHzzygQzqcG9uVUgU2fKeSN1GmfMiykdSzzY=
|
||||
github.com/pingcap/parser v0.0.0-20190114091152-8b799d66df61 h1:J9Z8Xn0MwBMOsB3jUcrirtRh0Df1Nzrv+hBmCyUg6E4=
|
||||
github.com/pingcap/parser v0.0.0-20190114091152-8b799d66df61/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
|
||||
github.com/pingcap/parser v0.0.0-20190114105451-005df5698910 h1:3kybw5XEJIcAkOQ1t8UuohQu+O3ndC/mRsJZFOEi83U=
|
||||
github.com/pingcap/parser v0.0.0-20190114105451-005df5698910/go.mod h1:1FNvfp9+J0wvc4kl8eGNh7Rqrxveg15jJoWo/a0uHwA=
|
||||
github.com/pingcap/pd v2.1.0-rc.4+incompatible h1:/buwGk04aHO5odk/+O8ZOXGs4qkUjYTJ2UpCJXna8NE=
|
||||
github.com/pingcap/pd v2.1.0-rc.4+incompatible/go.mod h1:nD3+EoYes4+aNNODO99ES59V83MZSI+dFbhyr667a0E=
|
||||
github.com/pingcap/tidb-tools v2.1.3-0.20190104033906-883b07a04a73+incompatible h1:Ba48wwPwPq5hd1kkQpgua49dqB5cthC2zXVo7fUUDec=
|
||||
|
||||
@ -51,7 +51,7 @@ func (b *Builder) ApplyDiff(m *meta.Meta, diff *model.SchemaDiff) ([]int64, erro
|
||||
var oldTableID, newTableID int64
|
||||
tblIDs := make([]int64, 0, 2)
|
||||
switch diff.Type {
|
||||
case model.ActionCreateTable:
|
||||
case model.ActionCreateTable, model.ActionRestoreTable:
|
||||
newTableID = diff.TableID
|
||||
tblIDs = append(tblIDs, newTableID)
|
||||
case model.ActionDropTable, model.ActionDropView:
|
||||
|
||||
19
kv/txn.go
19
kv/txn.go
@ -17,6 +17,7 @@ import (
|
||||
"context"
|
||||
"math"
|
||||
"math/rand"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
@ -123,3 +124,21 @@ func BatchGetValues(txn Transaction, keys []Key) (map[string][]byte, error) {
|
||||
}
|
||||
return storageValues, nil
|
||||
}
|
||||
|
||||
// mockCommitErrorEnable uses to enable `mockCommitError` and only mock error once.
|
||||
var mockCommitErrorEnable = int64(0)
|
||||
|
||||
// MockCommitErrorEnable exports for gofail testing.
|
||||
func MockCommitErrorEnable() {
|
||||
atomic.StoreInt64(&mockCommitErrorEnable, 1)
|
||||
}
|
||||
|
||||
// MockCommitErrorDisable exports for gofail testing.
|
||||
func MockCommitErrorDisable() {
|
||||
atomic.StoreInt64(&mockCommitErrorEnable, 0)
|
||||
}
|
||||
|
||||
// IsMockCommitErrorEnable exports for gofail testing.
|
||||
func IsMockCommitErrorEnable() bool {
|
||||
return atomic.LoadInt64(&mockCommitErrorEnable) == 1
|
||||
}
|
||||
|
||||
11
meta/meta.go
11
meta/meta.go
@ -296,6 +296,17 @@ func (m *Meta) CreateTable(dbID int64, tableInfo *model.TableInfo) error {
|
||||
return m.txn.HSet(dbKey, tableKey, data)
|
||||
}
|
||||
|
||||
// CreateTableAndSetAutoID creates a table with tableInfo in database,
|
||||
// and rebases the table autoID.
|
||||
func (m *Meta) CreateTableAndSetAutoID(dbID int64, tableInfo *model.TableInfo, autoID int64) error {
|
||||
err := m.CreateTable(dbID, tableInfo)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
_, err = m.txn.HInc(m.dbKey(dbID), m.autoTableIDKey(tableInfo.ID), autoID)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// DropDatabase drops whole database.
|
||||
func (m *Meta) DropDatabase(dbID int64) error {
|
||||
// Check if db exists.
|
||||
|
||||
@ -84,6 +84,14 @@ type RecoverIndex struct {
|
||||
IndexName string
|
||||
}
|
||||
|
||||
// RestoreTable is used for recover deleted files by mistake.
|
||||
type RestoreTable struct {
|
||||
baseSchemaProducer
|
||||
JobID int64
|
||||
Table *ast.TableName
|
||||
JobNum int64
|
||||
}
|
||||
|
||||
// CleanupIndex is used to delete dangling index data.
|
||||
type CleanupIndex struct {
|
||||
baseSchemaProducer
|
||||
|
||||
@ -550,6 +550,14 @@ func (b *PlanBuilder) buildAdmin(as *ast.AdminStmt) (Plan, error) {
|
||||
p := &ShowSlow{ShowSlow: as.ShowSlow}
|
||||
p.SetSchema(buildShowSlowSchema())
|
||||
ret = p
|
||||
case ast.AdminRestoreTable:
|
||||
if len(as.JobIDs) > 0 {
|
||||
ret = &RestoreTable{JobID: as.JobIDs[0]}
|
||||
} else if len(as.Tables) > 0 {
|
||||
ret = &RestoreTable{Table: as.Tables[0], JobNum: as.JobNumber}
|
||||
} else {
|
||||
return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as)
|
||||
}
|
||||
default:
|
||||
return nil, ErrUnsupportedType.GenWithStack("Unsupported ast.AdminStmt(%T) for buildAdmin", as)
|
||||
}
|
||||
|
||||
@ -84,6 +84,11 @@ func (p *preprocessor) Enter(in ast.Node) (out ast.Node, skipChildren bool) {
|
||||
return in, true
|
||||
case *ast.Join:
|
||||
p.checkNonUniqTableAlias(node)
|
||||
case *ast.AdminStmt:
|
||||
// The specified table in admin restore syntax maybe already been dropped.
|
||||
// So skip check table name here, otherwise, admin restore table [table_name] syntax will return
|
||||
// table not exists error. But admin restore is use to restore the dropped table. So skip children here.
|
||||
return in, node.Tp == ast.AdminRestoreTable
|
||||
default:
|
||||
p.parentIsJoin = false
|
||||
}
|
||||
|
||||
@ -169,6 +169,12 @@ func (txn *tikvTxn) Commit(ctx context.Context) error {
|
||||
}
|
||||
defer txn.close()
|
||||
|
||||
// gofail: var mockCommitError bool
|
||||
// if mockCommitError && kv.IsMockCommitErrorEnable() {
|
||||
// kv.MockCommitErrorDisable()
|
||||
// return errors.New("mock commit error")
|
||||
// }
|
||||
|
||||
metrics.TiKVTxnCmdCounter.WithLabelValues("set").Add(float64(txn.setCnt))
|
||||
metrics.TiKVTxnCmdCounter.WithLabelValues("commit").Inc()
|
||||
start := time.Now()
|
||||
|
||||
98
util/gcutil/gcutil.go
Normal file
98
util/gcutil/gcutil.go
Normal file
@ -0,0 +1,98 @@
|
||||
// Copyright 2019 PingCAP, Inc.
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License");
|
||||
// you may not use this file except in compliance with the License.
|
||||
// You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS,
|
||||
// See the License for the specific language governing permissions and
|
||||
// limitations under the License.
|
||||
|
||||
package gcutil
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/pingcap/errors"
|
||||
"github.com/pingcap/parser/model"
|
||||
"github.com/pingcap/tidb/sessionctx"
|
||||
"github.com/pingcap/tidb/sessionctx/variable"
|
||||
"github.com/pingcap/tidb/util"
|
||||
"github.com/pingcap/tidb/util/sqlexec"
|
||||
)
|
||||
|
||||
const (
|
||||
selectVariableValueSQL = `SELECT HIGH_PRIORITY variable_value FROM mysql.tidb WHERE variable_name='%s'`
|
||||
insertVariableValueSQL = `INSERT HIGH_PRIORITY INTO mysql.tidb VALUES ('%[1]s', '%[2]s', '%[3]s')
|
||||
ON DUPLICATE KEY
|
||||
UPDATE variable_value = '%[2]s', comment = '%[3]s'`
|
||||
)
|
||||
|
||||
// CheckGCEnable is use to check whether GC is enable.
|
||||
func CheckGCEnable(ctx sessionctx.Context) (enable bool, err error) {
|
||||
sql := fmt.Sprintf(selectVariableValueSQL, "tikv_gc_enable")
|
||||
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
|
||||
if err != nil {
|
||||
return false, errors.Trace(err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
return false, errors.New("can not get 'tikv_gc_enable'")
|
||||
}
|
||||
return rows[0].GetString(0) == "true", nil
|
||||
}
|
||||
|
||||
// DisableGC will disable GC enable variable.
|
||||
func DisableGC(ctx sessionctx.Context) error {
|
||||
sql := fmt.Sprintf(insertVariableValueSQL, "tikv_gc_enable", "false", "Current GC enable status")
|
||||
_, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// EnableGC will enable GC enable variable.
|
||||
func EnableGC(ctx sessionctx.Context) error {
|
||||
sql := fmt.Sprintf(insertVariableValueSQL, "tikv_gc_enable", "true", "Current GC enable status")
|
||||
_, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
// ValidateSnapshot checks that the newly set snapshot time is after GC safe point time.
|
||||
func ValidateSnapshot(ctx sessionctx.Context, snapshotTS uint64) error {
|
||||
safePointTS, err := GetGCSafePoint(ctx)
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
if safePointTS > snapshotTS {
|
||||
return variable.ErrSnapshotTooOld.GenWithStackByArgs(model.TSConvert2Time(safePointTS).String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ValidateSnapshotWithGCSafePoint checks that the newly set snapshot time is after GC safe point time.
|
||||
func ValidateSnapshotWithGCSafePoint(snapshotTS, safePointTS uint64) error {
|
||||
if safePointTS > snapshotTS {
|
||||
return variable.ErrSnapshotTooOld.GenWithStackByArgs(model.TSConvert2Time(safePointTS).String())
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGCSafePoint loads GC safe point time from mysql.tidb.
|
||||
func GetGCSafePoint(ctx sessionctx.Context) (uint64, error) {
|
||||
sql := fmt.Sprintf(selectVariableValueSQL, "tikv_gc_safe_point")
|
||||
rows, _, err := ctx.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(ctx, sql)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
if len(rows) != 1 {
|
||||
return 0, errors.New("can not get 'tikv_gc_safe_point'")
|
||||
}
|
||||
safePointString := rows[0].GetString(0)
|
||||
safePointTime, err := util.CompatibleParseGCTime(safePointString)
|
||||
if err != nil {
|
||||
return 0, errors.Trace(err)
|
||||
}
|
||||
ts := variable.GoTimeToTS(safePointTime)
|
||||
return ts, nil
|
||||
}
|
||||
Reference in New Issue
Block a user