bindinfo: sync concurrent ops on mysql.bind_info from multiple tidb instances (#21629)

This commit is contained in:
Kenan Yao
2020-12-18 03:51:35 +08:00
committed by GitHub
parent efc0759961
commit 31673c8e92
5 changed files with 128 additions and 83 deletions

View File

@ -139,7 +139,7 @@ func (s *testSuite) TearDownTest(c *C) {
}
func (s *testSuite) cleanBindingEnv(tk *testkit.TestKit) {
tk.MustExec("truncate table mysql.bind_info")
tk.MustExec("delete from mysql.bind_info where source != 'builtin'")
s.domain.BindHandle().Clear()
}
@ -1304,7 +1304,7 @@ func (s *testSuite) TestEvolveInvalidBindings(c *C) {
// Manufacture a rejected binding by hacking mysql.bind_info.
tk.MustExec("insert into mysql.bind_info values('select * from t where a > ?', 'select /*+ USE_INDEX(t,idx_a) */ * from t where a > 10', 'test', 'rejected', '2000-01-01 09:00:00', '2000-01-01 09:00:00', '', '','" +
bindinfo.Manual + "')")
tk.MustQuery("select bind_sql, status from mysql.bind_info").Sort().Check(testkit.Rows(
tk.MustQuery("select bind_sql, status from mysql.bind_info where source != 'builtin'").Sort().Check(testkit.Rows(
"select /*+ USE_INDEX(t) */ * from t where a > 10 using",
"select /*+ USE_INDEX(t,idx_a) */ * from t where a > 10 rejected",
))

View File

@ -42,6 +42,8 @@ const (
Capture = "capture"
// Evolve indicates the binding is evolved by TiDB from old bindings.
Evolve = "evolve"
// Builtin indicates the binding is a builtin record for internal locking purpose. It is also the status for the builtin binding.
Builtin = "builtin"
)
// Binding stores the basic bind hint info.

View File

@ -33,7 +33,6 @@ import (
"github.com/pingcap/tidb/metrics"
"github.com/pingcap/tidb/sessionctx"
"github.com/pingcap/tidb/sessionctx/variable"
"github.com/pingcap/tidb/store/tikv/oracle"
"github.com/pingcap/tidb/types"
driver "github.com/pingcap/tidb/types/parser_driver"
"github.com/pingcap/tidb/util/chunk"
@ -94,6 +93,8 @@ const (
OwnerKey = "/tidb/bindinfo/owner"
// Prompt is the prompt for bindinfo owner manager.
Prompt = "bindinfo"
// BuiltinPseudoSQL4BindLock is used to simulate LOCK TABLE for mysql.bind_info.
BuiltinPseudoSQL4BindLock = "builtin_pseudo_sql_for_bind_lock"
)
type bindRecordUpdate struct {
@ -123,7 +124,6 @@ func NewBindHandle(ctx sessionctx.Context) *BindHandle {
func (h *BindHandle) Update(fullLoad bool) (err error) {
h.bindInfo.Lock()
lastUpdateTime := h.bindInfo.lastUpdateTime
h.bindInfo.Unlock()
sql := "select original_sql, bind_sql, default_db, status, create_time, update_time, charset, collation, source from mysql.bind_info"
if !fullLoad {
@ -136,11 +136,10 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
// uses another background session.
rows, _, err := h.sctx.Context.(sqlexec.RestrictedSQLExecutor).ExecRestrictedSQL(sql)
if err != nil {
h.bindInfo.Unlock()
return err
}
// Make sure there is only one goroutine writes the cache.
h.bindInfo.Lock()
newCache := h.bindInfo.Value.Load().(cache).copy()
defer func() {
h.bindInfo.lastUpdateTime = lastUpdateTime
@ -149,6 +148,10 @@ func (h *BindHandle) Update(fullLoad bool) (err error) {
}()
for _, row := range rows {
// Skip the builtin record which is designed for binding synchronization.
if row.GetString(0) == BuiltinPseudoSQL4BindLock {
continue
}
hash, meta, err := h.newBindRecord(row)
if err != nil {
logutil.BgLogger().Debug("[sql-bind] failed to generate bind record from data row", zap.Error(err))
@ -179,65 +182,51 @@ func (h *BindHandle) CreateBindRecord(sctx sessionctx.Context, record *BindRecor
return err
}
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.bindInfo.Lock()
h.sctx.Lock()
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN")
if err != nil {
defer func() {
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
if err != nil {
return
}
normalizedSQL := parser.DigestNormalized(record.OriginalSQL)
oldRecord := h.GetBindRecord(normalizedSQL, record.OriginalSQL, record.Db)
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
h.sctx.Unlock()
terror.Log(err1)
return
}
_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
h.sctx.Unlock()
if err != nil {
return
}
// Make sure there is only one goroutine writes the cache and uses parser.
h.bindInfo.Lock()
if oldRecord != nil {
h.removeBindRecord(normalizedSQL, oldRecord)
}
h.appendBindRecord(normalizedSQL, record)
h.bindInfo.Unlock()
sqlDigest := parser.DigestNormalized(record.OriginalSQL)
h.setBindRecord(sqlDigest, record)
}()
var txn kv.Transaction
txn, err = h.sctx.Context.Txn(true)
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return err
}
// Binding recreation should physically delete previous bindings.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, ""))
if err != nil {
return err
}
now := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3)
if oldRecord != nil {
for _, binding := range oldRecord.Bindings {
// Binding recreation should physically delete previous bindings, since marking them as deleted may
// cause unexpected binding caches if there are concurrent CREATE BINDING on multiple tidb instances,
// because the record with `using` status is not guaranteed to have larger update_time than those records
// with `deleted` status.
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, binding.BindSQL))
if err != nil {
return err
}
}
}
now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
for i := range record.Bindings {
record.Bindings[i].CreateTime = now
record.Bindings[i].UpdateTime = now
// insert the BindRecord to the storage.
// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
if err != nil {
return err
@ -267,40 +256,37 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}
}
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.bindInfo.Lock()
h.sctx.Lock()
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN")
if err != nil {
defer func() {
h.sctx.Unlock()
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
if err != nil {
return
}
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
h.sctx.Unlock()
terror.Log(err1)
return
}
_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
h.sctx.Unlock()
if err != nil {
return
}
// Make sure there is only one goroutine writes the cache and uses parser.
h.bindInfo.Lock()
h.appendBindRecord(parser.DigestNormalized(record.OriginalSQL), record)
h.bindInfo.Unlock()
}()
var txn kv.Transaction
txn, err = h.sctx.Context.Txn(true)
if err != nil {
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return err
}
if duplicateBinding != nil {
_, err = exec.ExecuteInternal(context.TODO(), h.deleteBindInfoSQL(record.OriginalSQL, record.Db, duplicateBinding.BindSQL))
if err != nil {
@ -308,7 +294,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}
}
now := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3)
now := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
for i := range record.Bindings {
if duplicateBinding != nil {
record.Bindings[i].CreateTime = duplicateBinding.CreateTime
@ -317,7 +303,7 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
}
record.Bindings[i].UpdateTime = now
// insert the BindRecord to the storage.
// Insert the BindRecord to the storage.
_, err = exec.ExecuteInternal(context.TODO(), h.insertBindInfoSQL(record.OriginalSQL, record.Db, record.Bindings[i]))
if err != nil {
return err
@ -328,25 +314,27 @@ func (h *BindHandle) AddBindRecord(sctx sessionctx.Context, record *BindRecord)
// DropBindRecord drops a BindRecord to the storage and BindRecord int the cache.
func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (err error) {
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
h.bindInfo.Lock()
h.sctx.Lock()
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN")
if err != nil {
defer func() {
h.sctx.Unlock()
return
h.bindInfo.Unlock()
}()
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
if err != nil {
return err
}
var deleteRows int
defer func() {
if err != nil {
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
h.sctx.Unlock()
terror.Log(err1)
return
}
_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
h.sctx.Unlock()
if err != nil {
if err != nil || deleteRows == 0 {
return
}
@ -354,18 +342,15 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
if binding != nil {
record.Bindings = append(record.Bindings, *binding)
}
// Make sure there is only one goroutine writes the cache and uses parser.
h.bindInfo.Lock()
h.removeBindRecord(parser.DigestNormalized(originalSQL), record)
h.bindInfo.Unlock()
}()
txn, err1 := h.sctx.Context.Txn(true)
if err1 != nil {
return err1
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
if err = h.lockBindInfoTable(); err != nil {
return
}
updateTs := types.NewTime(types.FromGoTime(oracle.GetTimeFromTS(txn.StartTS())), mysql.TypeTimestamp, 3)
updateTs := types.NewTime(types.FromGoTime(time.Now()), mysql.TypeTimestamp, 3)
bindSQL := ""
if binding != nil {
@ -373,6 +358,20 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
}
_, err = exec.ExecuteInternal(context.TODO(), h.logicalDeleteBindInfoSQL(originalSQL, db, updateTs, bindSQL))
deleteRows = int(h.sctx.Context.GetSessionVars().StmtCtx.AffectedRows())
return err
}
// lockBindInfoTable simulates `LOCK TABLE mysql.bind_info WRITE` by acquiring a pessimistic lock on a
// special builtin row of mysql.bind_info. Note that this function must be called with h.sctx.Lock() held.
// We can replace this implementation to normal `LOCK TABLE mysql.bind_info WRITE` if that feature is
// generally available later.
// This lock would enforce the CREATE / DROP GLOBAL BINDING statements to be executed sequentially,
// even if they come from different tidb instances.
func (h *BindHandle) lockBindInfoTable() error {
// h.sctx already locked.
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
_, err := exec.ExecuteInternal(context.TODO(), h.lockBindInfoSQL())
return err
}
@ -483,6 +482,16 @@ func (h *BindHandle) newBindRecord(row chunk.Row) (string, *BindRecord, error) {
return hash, bindRecord, err
}
// setBindRecord sets the BindRecord to the cache, if there already exists a BindRecord,
// it will be overridden.
func (h *BindHandle) setBindRecord(hash string, meta *BindRecord) {
newCache := h.bindInfo.Value.Load().(cache).copy()
oldRecord := newCache.getBindRecord(hash, meta.OriginalSQL, meta.Db)
newCache.setBindRecord(hash, meta)
h.bindInfo.Value.Store(newCache)
updateMetrics(metrics.ScopeGlobal, oldRecord, meta, false)
}
// appendBindRecord addes the BindRecord to the cache, all the stale BindRecords are
// removed from the cache after this operation.
func (h *BindHandle) appendBindRecord(hash string, meta *BindRecord) {
@ -565,12 +574,15 @@ func (c cache) getBindRecord(hash, normdOrigSQL, db string) *BindRecord {
}
func (h *BindHandle) deleteBindInfoSQL(normdOrigSQL, db, bindSQL string) string {
return fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s AND bind_sql=%s`,
sql := fmt.Sprintf(
`DELETE FROM mysql.bind_info WHERE original_sql=%s AND default_db=%s`,
expression.Quote(normdOrigSQL),
expression.Quote(db),
expression.Quote(bindSQL),
)
if bindSQL == "" {
return sql
}
return sql + fmt.Sprintf(` and bind_sql = %s`, expression.Quote(bindSQL))
}
func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Binding) string {
@ -587,12 +599,21 @@ func (h *BindHandle) insertBindInfoSQL(orignalSQL string, db string, info Bindin
)
}
// lockBindInfoSQL simulates LOCK TABLE by updating a same row in each pessimistic transaction.
func (h *BindHandle) lockBindInfoSQL() string {
return fmt.Sprintf("UPDATE mysql.bind_info SET source=%s WHERE original_sql=%s",
expression.Quote(Builtin),
expression.Quote(BuiltinPseudoSQL4BindLock))
}
func (h *BindHandle) logicalDeleteBindInfoSQL(originalSQL, db string, updateTs types.Time, bindingSQL string) string {
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s`,
updateTsStr := updateTs.String()
sql := fmt.Sprintf(`UPDATE mysql.bind_info SET status=%s,update_time=%s WHERE original_sql=%s and default_db=%s and update_time<%s`,
expression.Quote(deleted),
expression.Quote(updateTs.String()),
expression.Quote(updateTsStr),
expression.Quote(originalSQL),
expression.Quote(db))
expression.Quote(db),
expression.Quote(updateTsStr))
if bindingSQL == "" {
return sql
}

View File

@ -31,6 +31,7 @@ import (
"github.com/pingcap/parser/auth"
"github.com/pingcap/parser/mysql"
"github.com/pingcap/parser/terror"
"github.com/pingcap/tidb/bindinfo"
"github.com/pingcap/tidb/config"
"github.com/pingcap/tidb/ddl"
"github.com/pingcap/tidb/domain"
@ -234,15 +235,15 @@ const (
// CreateBindInfoTable stores the sql bind info which is used to update globalBindCache.
CreateBindInfoTable = `CREATE TABLE IF NOT EXISTS mysql.bind_info (
original_sql TEXT NOT NULL ,
bind_sql TEXT NOT NULL ,
default_db TEXT NOT NULL,
status TEXT NOT NULL,
create_time TIMESTAMP(3) NOT NULL,
update_time TIMESTAMP(3) NOT NULL,
charset TEXT NOT NULL,
collation TEXT NOT NULL,
source VARCHAR(10) NOT NULL DEFAULT 'unknown',
original_sql TEXT NOT NULL,
bind_sql TEXT NOT NULL,
default_db TEXT NOT NULL,
status TEXT NOT NULL,
create_time TIMESTAMP(3) NOT NULL,
update_time TIMESTAMP(3) NOT NULL,
charset TEXT NOT NULL,
collation TEXT NOT NULL,
source VARCHAR(10) NOT NULL DEFAULT 'unknown',
INDEX sql_index(original_sql(1024),default_db(1024)) COMMENT "accelerate the speed when add global binding query",
INDEX time_index(update_time) COMMENT "accelerate the speed when querying with last update time"
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_bin;`
@ -437,6 +438,8 @@ const (
version55 = 55
// version56 fixes the bug that upgradeToVer49 would be missed when upgrading from v4.0 to a new version
version56 = 56
// version57 fixes the bug of concurrent create / drop binding
version57 = 57
)
var (
@ -497,6 +500,7 @@ var (
upgradeToVer54,
upgradeToVer55,
upgradeToVer56,
upgradeToVer57,
}
)
@ -1228,6 +1232,24 @@ func upgradeToVer56(s Session, ver int64) {
doReentrantDDL(s, CreateStatsExtended)
}
func upgradeToVer57(s Session, ver int64) {
if ver >= version57 {
return
}
insertBuiltinBindInfoRow(s)
}
func initBindInfoTable(s Session) {
mustExecute(s, CreateBindInfoTable)
insertBuiltinBindInfoRow(s)
}
func insertBuiltinBindInfoRow(s Session) {
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO mysql.bind_info VALUES ("%s", "%s", "mysql", "%s", "0000-00-00 00:00:00", "0000-00-00 00:00:00", "", "", "%s")`,
bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.BuiltinPseudoSQL4BindLock, bindinfo.Builtin, bindinfo.Builtin)
mustExecute(s, sql)
}
func writeMemoryQuotaQuery(s Session) {
comment := "memory_quota_query is 32GB by default in v3.0.x, 1GB by default in v4.0.x"
sql := fmt.Sprintf(`INSERT HIGH_PRIORITY INTO %s.%s VALUES ("%s", '%d', '%s') ON DUPLICATE KEY UPDATE VARIABLE_VALUE='%d'`,
@ -1291,7 +1313,7 @@ func doDDLWorks(s Session) {
// Create default_roles table.
mustExecute(s, CreateDefaultRolesTable)
// Create bind_info table.
mustExecute(s, CreateBindInfoTable)
initBindInfoTable(s)
// Create stats_topn_store table.
mustExecute(s, CreateStatsTopNTable)
// Create expr_pushdown_blacklist table.

View File

@ -2133,7 +2133,7 @@ func CreateSessionWithDomain(store kv.Storage, dom *domain.Domain) (*session, er
const (
notBootstrapped = 0
currentBootstrapVersion = version56
currentBootstrapVersion = version57
)
func getStoreBootstrapVersion(store kv.Storage) int64 {