bindinfo: garbage collect deleted bind records (#26206)
This commit is contained in:
@ -2160,3 +2160,40 @@ func (s *testSuite) TestBindingLastUpdateTime(c *C) {
|
||||
c.Assert(updateTime2, Equals, updateTime)
|
||||
tk.MustQuery(`show global status like 'last_plan_binding_update_time';`).Check(testkit.Rows())
|
||||
}
|
||||
|
||||
func (s *testSuite) TestGCBindRecord(c *C) {
|
||||
tk := testkit.NewTestKit(c, s.store)
|
||||
s.cleanBindingEnv(tk)
|
||||
tk.MustExec("use test")
|
||||
tk.MustExec("drop table if exists t")
|
||||
tk.MustExec("create table t(a int, b int, key(a))")
|
||||
|
||||
tk.MustExec("create global binding for select * from t where a = 1 using select * from t use index(a) where a = 1")
|
||||
rows := tk.MustQuery("show global bindings").Rows()
|
||||
c.Assert(len(rows), Equals, 1)
|
||||
c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `a` = ?")
|
||||
c.Assert(rows[0][3], Equals, "using")
|
||||
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows(
|
||||
"using",
|
||||
))
|
||||
|
||||
h := s.domain.BindHandle()
|
||||
// bindinfo.Lease is set to 0 for test env in SetUpSuite.
|
||||
c.Assert(h.GCBindRecord(), IsNil)
|
||||
rows = tk.MustQuery("show global bindings").Rows()
|
||||
c.Assert(len(rows), Equals, 1)
|
||||
c.Assert(rows[0][0], Equals, "select * from `test` . `t` where `a` = ?")
|
||||
c.Assert(rows[0][3], Equals, "using")
|
||||
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows(
|
||||
"using",
|
||||
))
|
||||
|
||||
tk.MustExec("drop global binding for select * from t where a = 1")
|
||||
tk.MustQuery("show global bindings").Check(testkit.Rows())
|
||||
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows(
|
||||
"deleted",
|
||||
))
|
||||
c.Assert(h.GCBindRecord(), IsNil)
|
||||
tk.MustQuery("show global bindings").Check(testkit.Rows())
|
||||
tk.MustQuery("select status from mysql.bind_info where original_sql = 'select * from `test` . `t` where `a` = ?'").Check(testkit.Rows())
|
||||
}
|
||||
|
||||
@ -390,6 +390,45 @@ func (h *BindHandle) DropBindRecord(originalSQL, db string, binding *Binding) (e
|
||||
return err
|
||||
}
|
||||
|
||||
// GCBindRecord physically removes the deleted bind records in mysql.bind_info.
|
||||
func (h *BindHandle) GCBindRecord() (err error) {
|
||||
h.bindInfo.Lock()
|
||||
h.sctx.Lock()
|
||||
defer func() {
|
||||
h.sctx.Unlock()
|
||||
h.bindInfo.Unlock()
|
||||
}()
|
||||
exec, _ := h.sctx.Context.(sqlexec.SQLExecutor)
|
||||
_, err = exec.ExecuteInternal(context.TODO(), "BEGIN PESSIMISTIC")
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
_, err1 := exec.ExecuteInternal(context.TODO(), "ROLLBACK")
|
||||
terror.Log(err1)
|
||||
return
|
||||
}
|
||||
|
||||
_, err = exec.ExecuteInternal(context.TODO(), "COMMIT")
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
}()
|
||||
|
||||
// Lock mysql.bind_info to synchronize with CreateBindRecord / AddBindRecord / DropBindRecord on other tidb instances.
|
||||
if err = h.lockBindInfoTable(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// To make sure that all the deleted bind records have been acknowledged to all tidb,
|
||||
// we only garbage collect those records with update_time before 10 leases.
|
||||
updateTime := time.Now().Add(-(10 * Lease))
|
||||
updateTimeStr := types.NewTime(types.FromGoTime(updateTime), mysql.TypeTimestamp, 3).String()
|
||||
_, err = exec.ExecuteInternal(context.TODO(), `DELETE FROM mysql.bind_info WHERE status = 'deleted' and update_time < %?`, updateTimeStr)
|
||||
return err
|
||||
}
|
||||
|
||||
// lockBindInfoTable simulates `LOCK TABLE mysql.bind_info WRITE` by acquiring a pessimistic lock on a
|
||||
// special builtin row of mysql.bind_info. Note that this function must be called with h.sctx.Lock() held.
|
||||
// We can replace this implementation to normal `LOCK TABLE mysql.bind_info WRITE` if that feature is
|
||||
|
||||
@ -990,12 +990,13 @@ func (do *Domain) LoadBindInfoLoop(ctxForHandle sessionctx.Context, ctxForEvolve
|
||||
return err
|
||||
}
|
||||
|
||||
do.globalBindHandleWorkerLoop()
|
||||
do.handleEvolvePlanTasksLoop(ctxForEvolve)
|
||||
owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
|
||||
do.globalBindHandleWorkerLoop(owner)
|
||||
do.handleEvolvePlanTasksLoop(ctxForEvolve, owner)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (do *Domain) globalBindHandleWorkerLoop() {
|
||||
func (do *Domain) globalBindHandleWorkerLoop(owner owner.Manager) {
|
||||
do.wg.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
@ -1004,10 +1005,15 @@ func (do *Domain) globalBindHandleWorkerLoop() {
|
||||
util.Recover(metrics.LabelDomain, "globalBindHandleWorkerLoop", nil, false)
|
||||
}()
|
||||
bindWorkerTicker := time.NewTicker(bindinfo.Lease)
|
||||
defer bindWorkerTicker.Stop()
|
||||
gcBindTicker := time.NewTicker(100 * bindinfo.Lease)
|
||||
defer func() {
|
||||
bindWorkerTicker.Stop()
|
||||
gcBindTicker.Stop()
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-do.exit:
|
||||
owner.Cancel()
|
||||
return
|
||||
case <-bindWorkerTicker.C:
|
||||
err := do.bindHandle.Update(false)
|
||||
@ -1019,12 +1025,20 @@ func (do *Domain) globalBindHandleWorkerLoop() {
|
||||
do.bindHandle.CaptureBaselines()
|
||||
}
|
||||
do.bindHandle.SaveEvolveTasksToStore()
|
||||
case <-gcBindTicker.C:
|
||||
if !owner.IsOwner() {
|
||||
continue
|
||||
}
|
||||
err := do.bindHandle.GCBindRecord()
|
||||
if err != nil {
|
||||
logutil.BgLogger().Error("GC bind record failed", zap.Error(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context) {
|
||||
func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context, owner owner.Manager) {
|
||||
do.wg.Add(1)
|
||||
go func() {
|
||||
defer func() {
|
||||
@ -1032,7 +1046,6 @@ func (do *Domain) handleEvolvePlanTasksLoop(ctx sessionctx.Context) {
|
||||
logutil.BgLogger().Info("handleEvolvePlanTasksLoop exited.")
|
||||
util.Recover(metrics.LabelDomain, "handleEvolvePlanTasksLoop", nil, false)
|
||||
}()
|
||||
owner := do.newOwnerManager(bindinfo.Prompt, bindinfo.OwnerKey)
|
||||
for {
|
||||
select {
|
||||
case <-do.exit:
|
||||
|
||||
Reference in New Issue
Block a user