*: save gc_delete_range items that is done in another table instead of deleting them. (#6512)

This commit is contained in:
MyonKeminta
2018-05-14 20:09:15 +08:00
committed by tiancaiamao
parent f0e98c75b5
commit 10e57a5cef
7 changed files with 45 additions and 13 deletions

View File

@ -88,7 +88,6 @@ var (
errTooLongKey = terror.ClassDDL.New(codeTooLongKey,
fmt.Sprintf("Specified key was too long; max key length is %d bytes", maxPrefixLength))
errKeyColumnDoesNotExits = terror.ClassDDL.New(codeKeyColumnDoesNotExits, "this key column doesn't exist in table")
errDupKeyName = terror.ClassDDL.New(codeDupKeyName, "duplicate key name")
errUnknownTypeLength = terror.ClassDDL.New(codeUnknownTypeLength, "Unknown length for type tp %d")
errUnknownFractionLength = terror.ClassDDL.New(codeUnknownFractionLength, "Unknown Length for type tp %d and fraction %d")
errInvalidJobVersion = terror.ClassDDL.New(codeInvalidJobVersion, "DDL job with version %d greater than current %d")
@ -114,6 +113,8 @@ var (
errBlobCantHaveDefault = terror.ClassDDL.New(codeBlobCantHaveDefault, mysql.MySQLErrName[mysql.ErrBlobCantHaveDefault])
errTooLongIndexComment = terror.ClassDDL.New(codeErrTooLongIndexComment, mysql.MySQLErrName[mysql.ErrTooLongIndexComment])
// ErrDupKeyName returns for duplicated key name
ErrDupKeyName = terror.ClassDDL.New(codeDupKeyName, "duplicate key name")
// ErrInvalidDBState returns for invalid database state.
ErrInvalidDBState = terror.ClassDDL.New(codeInvalidDBState, "invalid database state")
// ErrInvalidTableState returns for invalid Table state.

View File

@ -545,7 +545,7 @@ func checkDuplicateConstraint(namesMap map[string]bool, name string, foreign boo
if foreign {
return infoschema.ErrCannotAddForeign
}
return errDupKeyName.Gen("duplicate key name %s", name)
return ErrDupKeyName.Gen("duplicate key name %s", name)
}
namesMap[nameLower] = true
return nil
@ -1677,7 +1677,7 @@ func (d *ddl) CreateIndex(ctx sessionctx.Context, ti ast.Ident, unique bool, ind
}
if indexInfo := findIndexByName(indexName.L, t.Meta().Indices); indexInfo != nil {
return errDupKeyName.Gen("index already exist %s", indexName)
return ErrDupKeyName.Gen("index already exist %s", indexName)
}
if err = checkTooLongIndex(indexName); err != nil {

View File

@ -207,7 +207,7 @@ func (d *ddl) onCreateIndex(t *meta.Meta, job *model.Job) (ver int64, err error)
indexInfo := findIndexByName(indexName.L, tblInfo.Indices)
if indexInfo != nil && indexInfo.State == model.StatePublic {
job.State = model.JobStateCancelled
return ver, errDupKeyName.Gen("index already exist %s", indexName)
return ver, ErrDupKeyName.Gen("index already exist %s", indexName)
}
if indexInfo == nil {

View File

@ -27,9 +27,10 @@ import (
)
const (
loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.gc_delete_range WHERE ts < %v ORDER BY ts`
completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d`
updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%s" WHERE job_id = %d AND element_id = %d AND start_key = "%s"`
loadDeleteRangeSQL = `SELECT HIGH_PRIORITY job_id, element_id, start_key, end_key FROM mysql.gc_delete_range WHERE ts < %v`
recordDoneDeletedRangeSQL = `INSERT IGNORE INTO mysql.gc_delete_range_done SELECT * FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d`
completeDeleteRangeSQL = `DELETE FROM mysql.gc_delete_range WHERE job_id = %d AND element_id = %d`
updateDeleteRangeSQL = `UPDATE mysql.gc_delete_range SET start_key = "%s" WHERE job_id = %d AND element_id = %d AND start_key = "%s"`
)
// DelRangeTask is for run delete-range command in gc_worker.
@ -86,11 +87,17 @@ func LoadDeleteRanges(ctx sessionctx.Context, safePoint uint64) (ranges []DelRan
return ranges, nil
}
// CompleteDeleteRange deletes a record from gc_delete_range table.
// CompleteDeleteRange moves a record from gc_delete_range table to gc_delete_range_done table.
// NOTE: This function WILL NOT start and run in a new transaction internally.
func CompleteDeleteRange(ctx sessionctx.Context, dr DelRangeTask) error {
sql := fmt.Sprintf(completeDeleteRangeSQL, dr.JobID, dr.ElementID)
sql := fmt.Sprintf(recordDoneDeletedRangeSQL, dr.JobID, dr.ElementID)
_, err := ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
if err != nil {
return errors.Trace(err)
}
sql = fmt.Sprintf(completeDeleteRangeSQL, dr.JobID, dr.ElementID)
_, err = ctx.(sqlexec.SQLExecutor).Execute(context.TODO(), sql)
return errors.Trace(err)
}

View File

@ -240,7 +240,7 @@ func (s *testSuite) TestAggregation(c *C) {
result = tk.MustQuery("select count(*) from information_schema.columns")
// When adding new memory columns in information_schema, please update this variable.
columnCountOfAllInformationSchemaTables := "741"
columnCountOfAllInformationSchemaTables := "746"
result.Check(testkit.Rows(columnCountOfAllInformationSchemaTables))
tk.MustExec("drop table if exists t1")

View File

@ -185,8 +185,17 @@ const (
start_key VARCHAR(255) NOT NULL COMMENT "encoded in hex",
end_key VARCHAR(255) NOT NULL COMMENT "encoded in hex",
ts BIGINT NOT NULL COMMENT "timestamp in int64",
UNIQUE KEY (element_id),
KEY (job_id, element_id)
UNIQUE KEY delete_range_index (job_id, element_id)
);`
// CreateGCDeleteRangeDoneTable stores schemas which are already deleted by DeleteRange.
CreateGCDeleteRangeDoneTable = `CREATE TABLE IF NOT EXISTS mysql.gc_delete_range_done (
job_id BIGINT NOT NULL COMMENT "the DDL job ID",
element_id BIGINT NOT NULL COMMENT "the schema element ID",
start_key VARCHAR(255) NOT NULL COMMENT "encoded in hex",
end_key VARCHAR(255) NOT NULL COMMENT "encoded in hex",
ts BIGINT NOT NULL COMMENT "timestamp in int64",
UNIQUE KEY delete_range_done_index (job_id, element_id)
);`
// CreateStatsFeedbackTable stores the feedback info which is used to update stats.
@ -243,6 +252,7 @@ const (
version18 = 18
version19 = 19
version20 = 20
version21 = 21
)
func checkBootstrapped(s Session) (bool, error) {
@ -381,6 +391,10 @@ func upgrade(s Session) {
upgradeToVer20(s)
}
if ver < version21 {
upgradeToVer21(s)
}
updateBootstrapVer(s)
_, err = s.Execute(context.Background(), "COMMIT")
@ -607,6 +621,14 @@ func upgradeToVer20(s Session) {
doReentrantDDL(s, CreateStatsFeedbackTable)
}
func upgradeToVer21(s Session) {
mustExecute(s, CreateGCDeleteRangeDoneTable)
doReentrantDDL(s, "ALTER TABLE mysql.gc_delete_range DROP INDEX job_id", ddl.ErrCantDropFieldOrKey)
doReentrantDDL(s, "ALTER TABLE mysql.gc_delete_range ADD UNIQUE INDEX delete_range_index (job_id, element_id)", ddl.ErrDupKeyName)
doReentrantDDL(s, "ALTER TABLE mysql.gc_delete_range DROP INDEX element_id", ddl.ErrCantDropFieldOrKey)
}
// updateBootstrapVer updates bootstrap version variable in mysql.TiDB table.
func updateBootstrapVer(s Session) {
// Update bootstrap version.
@ -653,6 +675,8 @@ func doDDLWorks(s Session) {
mustExecute(s, CreateStatsBucketsTable)
// Create gc_delete_range table.
mustExecute(s, CreateGCDeleteRangeTable)
// Create gc_delete_range_done table.
mustExecute(s, CreateGCDeleteRangeDoneTable)
// Create stats_feedback table.
mustExecute(s, CreateStatsFeedbackTable)
}

View File

@ -1210,7 +1210,7 @@ func createSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
const (
notBootstrapped = 0
currentBootstrapVersion = 20
currentBootstrapVersion = 21
)
func getStoreBootstrapVersion(store kv.Storage) int64 {