DDLs that doesn't modify rows like create/drop/truncate table, create/drop database do not need to wait 2 lease on ddl worker, we can safely remove those wait to speed up DDL. But user may create a table, then insert data on that table on another TiDB server, So we need to wait in session. User can set a session variable to skip DDL wait if needed, and there is no risk of data inconsistency. This solution remove the need for user to set lease to zero when loading data, It's much more safe and easy to use.
236 lines
7.3 KiB
Go
236 lines
7.3 KiB
Go
// Copyright 2016 PingCAP, Inc.
|
|
//
|
|
// Licensed under the Apache License, Version 2.0 (the "License");
|
|
// you may not use this file except in compliance with the License.
|
|
// You may obtain a copy of the License at
|
|
//
|
|
// http://www.apache.org/licenses/LICENSE-2.0
|
|
//
|
|
// Unless required by applicable law or agreed to in writing, software
|
|
// distributed under the License is distributed on an "AS IS" BASIS,
|
|
// See the License for the specific language governing permissions and
|
|
// limitations under the License.
|
|
|
|
package executor
|
|
|
|
import (
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/juju/errors"
|
|
"github.com/pingcap/tidb/ast"
|
|
"github.com/pingcap/tidb/context"
|
|
"github.com/pingcap/tidb/expression"
|
|
"github.com/pingcap/tidb/infoschema"
|
|
"github.com/pingcap/tidb/model"
|
|
"github.com/pingcap/tidb/mysql"
|
|
"github.com/pingcap/tidb/privilege"
|
|
"github.com/pingcap/tidb/sessionctx"
|
|
"github.com/pingcap/tidb/sessionctx/variable"
|
|
"github.com/pingcap/tidb/sessionctx/varsutil"
|
|
"github.com/pingcap/tidb/terror"
|
|
"github.com/pingcap/tidb/util/types"
|
|
)
|
|
|
|
// DDLExec represents a DDL executor.
|
|
// It grabs a DDL instance from Domain, calling the DDL methods to do the work.
|
|
type DDLExec struct {
|
|
Statement ast.StmtNode
|
|
ctx context.Context
|
|
is infoschema.InfoSchema
|
|
done bool
|
|
}
|
|
|
|
// Schema implements the Executor Schema interface.
|
|
func (e *DDLExec) Schema() expression.Schema {
|
|
return expression.NewSchema(nil)
|
|
}
|
|
|
|
// Next implements Execution Next interface.
|
|
func (e *DDLExec) Next() (*Row, error) {
|
|
if e.done {
|
|
return nil, nil
|
|
}
|
|
// For create/drop database, create/drop/truncate table
|
|
// DDL worker do not wait 2 lease, so we need to wait in executor to make sure
|
|
// all TiDB server has updated the schema.
|
|
var needWait bool
|
|
var err error
|
|
switch x := e.Statement.(type) {
|
|
case *ast.TruncateTableStmt:
|
|
err = e.executeTruncateTable(x)
|
|
needWait = true
|
|
case *ast.CreateDatabaseStmt:
|
|
err = e.executeCreateDatabase(x)
|
|
needWait = true
|
|
case *ast.CreateTableStmt:
|
|
err = e.executeCreateTable(x)
|
|
needWait = true
|
|
case *ast.CreateIndexStmt:
|
|
err = e.executeCreateIndex(x)
|
|
case *ast.DropDatabaseStmt:
|
|
err = e.executeDropDatabase(x)
|
|
needWait = true
|
|
case *ast.DropTableStmt:
|
|
err = e.executeDropTable(x)
|
|
needWait = true
|
|
case *ast.DropIndexStmt:
|
|
err = e.executeDropIndex(x)
|
|
case *ast.AlterTableStmt:
|
|
err = e.executeAlterTable(x)
|
|
}
|
|
if err != nil {
|
|
return nil, errors.Trace(err)
|
|
}
|
|
if e.ctx.GetSessionVars().SkipDDLWait {
|
|
needWait = false
|
|
}
|
|
|
|
dom := sessionctx.GetDomain(e.ctx)
|
|
if needWait {
|
|
time.Sleep(dom.DDL().GetLease() * 2)
|
|
}
|
|
|
|
// Update InfoSchema in TxnCtx, so it will pass schema check.
|
|
is := dom.InfoSchema()
|
|
txnCtx := e.ctx.GetSessionVars().TxnCtx
|
|
txnCtx.InfoSchema = is
|
|
txnCtx.SchemaVersion = is.SchemaMetaVersion()
|
|
// DDL will force commit old transaction, after DDL, in transaction status should be false.
|
|
e.ctx.GetSessionVars().SetStatusFlag(mysql.ServerStatusInTrans, false)
|
|
e.done = true
|
|
return nil, nil
|
|
}
|
|
|
|
// Close implements the Executor Close interface.
|
|
func (e *DDLExec) Close() error {
|
|
return nil
|
|
}
|
|
|
|
func (e *DDLExec) executeTruncateTable(s *ast.TruncateTableStmt) error {
|
|
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
|
|
err := sessionctx.GetDomain(e.ctx).DDL().TruncateTable(e.ctx, ident)
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (e *DDLExec) executeCreateDatabase(s *ast.CreateDatabaseStmt) error {
|
|
var opt *ast.CharsetOpt
|
|
if len(s.Options) != 0 {
|
|
opt = &ast.CharsetOpt{}
|
|
for _, val := range s.Options {
|
|
switch val.Tp {
|
|
case ast.DatabaseOptionCharset:
|
|
opt.Chs = val.Value
|
|
case ast.DatabaseOptionCollate:
|
|
opt.Col = val.Value
|
|
}
|
|
}
|
|
}
|
|
err := sessionctx.GetDomain(e.ctx).DDL().CreateSchema(e.ctx, model.NewCIStr(s.Name), opt)
|
|
if err != nil {
|
|
if terror.ErrorEqual(err, infoschema.ErrDatabaseExists) && s.IfNotExists {
|
|
err = nil
|
|
}
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (e *DDLExec) executeCreateTable(s *ast.CreateTableStmt) error {
|
|
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
|
|
err := sessionctx.GetDomain(e.ctx).DDL().CreateTable(e.ctx, ident, s.Cols, s.Constraints, s.Options)
|
|
if terror.ErrorEqual(err, infoschema.ErrTableExists) {
|
|
if s.IfNotExists {
|
|
return nil
|
|
}
|
|
return err
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (e *DDLExec) executeCreateIndex(s *ast.CreateIndexStmt) error {
|
|
ident := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
|
|
err := sessionctx.GetDomain(e.ctx).DDL().CreateIndex(e.ctx, ident, s.Unique, model.NewCIStr(s.IndexName), s.IndexColNames)
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (e *DDLExec) executeDropDatabase(s *ast.DropDatabaseStmt) error {
|
|
dbName := model.NewCIStr(s.Name)
|
|
err := sessionctx.GetDomain(e.ctx).DDL().DropSchema(e.ctx, dbName)
|
|
if terror.ErrorEqual(err, infoschema.ErrDatabaseNotExists) {
|
|
if s.IfExists {
|
|
err = nil
|
|
} else {
|
|
err = infoschema.ErrDatabaseDropExists.GenByArgs(s.Name)
|
|
}
|
|
}
|
|
sessionVars := e.ctx.GetSessionVars()
|
|
if err == nil && strings.ToLower(sessionVars.CurrentDB) == dbName.L {
|
|
sessionVars.CurrentDB = ""
|
|
err = varsutil.SetSystemVar(sessionVars, variable.CharsetDatabase, types.NewStringDatum("utf8"))
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
err = varsutil.SetSystemVar(sessionVars, variable.CollationDatabase, types.NewStringDatum("utf8_unicode_ci"))
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (e *DDLExec) executeDropTable(s *ast.DropTableStmt) error {
|
|
var notExistTables []string
|
|
for _, tn := range s.Tables {
|
|
fullti := ast.Ident{Schema: tn.Schema, Name: tn.Name}
|
|
schema, ok := e.is.SchemaByName(tn.Schema)
|
|
if !ok {
|
|
// TODO: we should return special error for table not exist, checking "not exist" is not enough,
|
|
// because some other errors may contain this error string too.
|
|
notExistTables = append(notExistTables, fullti.String())
|
|
continue
|
|
}
|
|
tb, err := e.is.TableByName(tn.Schema, tn.Name)
|
|
if err != nil && infoschema.ErrTableNotExists.Equal(err) {
|
|
notExistTables = append(notExistTables, fullti.String())
|
|
continue
|
|
} else if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
// Check Privilege
|
|
privChecker := privilege.GetPrivilegeChecker(e.ctx)
|
|
hasPriv, err := privChecker.Check(e.ctx, schema, tb.Meta(), mysql.DropPriv)
|
|
if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
if !hasPriv {
|
|
return errors.Errorf("You do not have the privilege to drop table %s.%s.", tn.Schema, tn.Name)
|
|
}
|
|
|
|
err = sessionctx.GetDomain(e.ctx).DDL().DropTable(e.ctx, fullti)
|
|
if infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err) {
|
|
notExistTables = append(notExistTables, fullti.String())
|
|
} else if err != nil {
|
|
return errors.Trace(err)
|
|
}
|
|
}
|
|
if len(notExistTables) > 0 && !s.IfExists {
|
|
return infoschema.ErrTableDropExists.GenByArgs(strings.Join(notExistTables, ","))
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (e *DDLExec) executeDropIndex(s *ast.DropIndexStmt) error {
|
|
ti := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
|
|
err := sessionctx.GetDomain(e.ctx).DDL().DropIndex(e.ctx, ti, model.NewCIStr(s.IndexName))
|
|
if (infoschema.ErrDatabaseNotExists.Equal(err) || infoschema.ErrTableNotExists.Equal(err)) && s.IfExists {
|
|
err = nil
|
|
}
|
|
return errors.Trace(err)
|
|
}
|
|
|
|
func (e *DDLExec) executeAlterTable(s *ast.AlterTableStmt) error {
|
|
ti := ast.Ident{Schema: s.Table.Schema, Name: s.Table.Name}
|
|
err := sessionctx.GetDomain(e.ctx).DDL().AlterTable(e.ctx, ti, s.Specs)
|
|
return errors.Trace(err)
|
|
}
|