ddl: add the ddl notifier as a listener to the stats owner (#56795)

ref pingcap/tidb#55722
This commit is contained in:
Rustin
2024-10-25 10:54:48 +08:00
committed by GitHub
parent ed9a909395
commit 3ebfad0861
5 changed files with 109 additions and 51 deletions

View File

@ -14,8 +14,9 @@ go_library(
"//pkg/ddl/session",
"//pkg/kv",
"//pkg/meta/model",
"//pkg/session/types",
"//pkg/owner",
"//pkg/sessionctx",
"//pkg/util",
"//pkg/util/intest",
"//pkg/util/logutil",
"@com_github_pingcap_errors//:errors",
@ -41,6 +42,8 @@ go_test(
"//pkg/sessionctx",
"//pkg/testkit",
"//pkg/testkit/testfailpoint",
"//pkg/util",
"@com_github_ngaut_pools//:pools",
"@com_github_stretchr_testify//require",
],
)

View File

@ -23,8 +23,9 @@ import (
"github.com/pingcap/errors"
sess "github.com/pingcap/tidb/pkg/ddl/session"
"github.com/pingcap/tidb/pkg/kv"
"github.com/pingcap/tidb/pkg/session/types"
"github.com/pingcap/tidb/pkg/owner"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/util"
"github.com/pingcap/tidb/pkg/util/intest"
"github.com/pingcap/tidb/pkg/util/logutil"
"go.uber.org/zap"
@ -79,9 +80,23 @@ func (id HandlerID) String() string {
}
}
// Ensure DDLNotifier implements the owner.Listener interface.
// The DDLNotifier is started only when the stats owner is elected to ensure consistency.
// This design is crucial because:
// 1. The stats handler(priority queue) processes DDLNotifier events in memory.
// 2. Keeping the stats handler and DDLNotifier on the same node maintains data integrity.
// 3. It prevents potential race conditions or inconsistencies that could arise from
// distributed processing of these events across multiple nodes.
var _ owner.Listener = (*DDLNotifier)(nil)
// DDLNotifier implements the subscription on DDL events.
type DDLNotifier struct {
ownedSess types.Session
// The context is initialized in Start and canceled in Stop and Close.
ctx context.Context
cancel context.CancelFunc
wg util.WaitGroupWrapper
sysSessionPool util.SessionPool
store Store
handlers map[HandlerID]SchemaChangeHandler
pollInterval time.Duration
@ -90,19 +105,17 @@ type DDLNotifier struct {
handlersBitMap uint64
}
// NewDDLNotifier initializes the global DDLNotifier. It should be called only
// once and before any RegisterHandler call. The ownership of the sctx is passed
// to the DDLNotifier.
// NewDDLNotifier initializes the global DDLNotifier.
func NewDDLNotifier(
sess types.Session,
sysSessionPool util.SessionPool,
store Store,
pollInterval time.Duration,
) *DDLNotifier {
return &DDLNotifier{
ownedSess: sess,
store: store,
handlers: make(map[HandlerID]SchemaChangeHandler),
pollInterval: pollInterval,
sysSessionPool: sysSessionPool,
store: store,
handlers: make(map[HandlerID]SchemaChangeHandler),
pollInterval: pollInterval,
}
}
@ -125,13 +138,14 @@ func (n *DDLNotifier) RegisterHandler(id HandlerID, handler SchemaChangeHandler)
n.handlers[id] = handler
}
// Start starts the DDLNotifier. It will block until the context is canceled.
func (n *DDLNotifier) Start(ctx context.Context) {
// start starts the DDLNotifier. It will block until the context is canceled.
// Do not call this function directly. Use owner.Listener interface instead.
func (n *DDLNotifier) start() {
for id := range n.handlers {
n.handlersBitMap |= 1 << id
}
ctx = kv.WithInternalSourceType(ctx, kv.InternalDDLNotifier)
ctx := kv.WithInternalSourceType(n.ctx, kv.InternalDDLNotifier)
ctx = logutil.WithCategory(ctx, "ddl-notifier")
ticker := time.NewTicker(n.pollInterval)
defer ticker.Stop()
@ -148,7 +162,14 @@ func (n *DDLNotifier) Start(ctx context.Context) {
}
func (n *DDLNotifier) processEvents(ctx context.Context) error {
changes, err := n.store.List(ctx, sess.NewSession(n.ownedSess))
sysSession, err := n.sysSessionPool.Get()
if err != nil {
return errors.Trace(err)
}
defer n.sysSessionPool.Put(sysSession)
session := sess.NewSession(sysSession.(sessionctx.Context))
changes, err := n.store.List(ctx, session)
if err != nil {
return errors.Trace(err)
}
@ -161,7 +182,7 @@ func (n *DDLNotifier) processEvents(ctx context.Context) error {
if _, ok := skipHandlers[handlerID]; ok {
continue
}
if err2 := n.processEventForHandler(ctx, change, handlerID, handler); err2 != nil {
if err2 := n.processEventForHandler(ctx, session, change, handlerID, handler); err2 != nil {
skipHandlers[handlerID] = struct{}{}
if !goerr.Is(err2, ErrNotReadyRetryLater) {
@ -187,7 +208,7 @@ func (n *DDLNotifier) processEvents(ctx context.Context) error {
if change.processedByFlag == n.handlersBitMap {
if err2 := n.store.DeleteAndCommit(
ctx,
sess.NewSession(n.ownedSess),
session,
change.ddlJobID,
int(change.multiSchemaChangeSeq),
); err2 != nil {
@ -206,6 +227,7 @@ const slowHandlerLogThreshold = time.Second * 5
func (n *DDLNotifier) processEventForHandler(
ctx context.Context,
session *sess.Session,
change *schemaChange,
handlerID HandlerID,
handler SchemaChangeHandler,
@ -214,21 +236,19 @@ func (n *DDLNotifier) processEventForHandler(
return nil
}
se := sess.NewSession(n.ownedSess)
if err = se.Begin(ctx); err != nil {
if err = session.Begin(ctx); err != nil {
return errors.Trace(err)
}
defer func() {
if err == nil {
err = errors.Trace(se.Commit(ctx))
err = errors.Trace(session.Commit(ctx))
} else {
se.Rollback()
session.Rollback()
}
}()
now := time.Now()
if err = handler(ctx, n.ownedSess, change.event); err != nil {
if err = handler(ctx, session.Context, change.event); err != nil {
return errors.Trace(err)
}
if time.Since(now) > slowHandlerLogThreshold {
@ -243,7 +263,7 @@ func (n *DDLNotifier) processEventForHandler(
newFlag := change.processedByFlag | (1 << handlerID)
if err = n.store.UpdateProcessed(
ctx,
se,
session,
change.ddlJobID,
change.multiSchemaChangeSeq,
newFlag,
@ -255,7 +275,36 @@ func (n *DDLNotifier) processEventForHandler(
return nil
}
// Close releases the resources.
func (n *DDLNotifier) Close() {
n.ownedSess.Close()
// Stop stops the background loop.
// Exposed for testing.
// Do not call this function directly. Use owner.Listener interface instead.
func (n *DDLNotifier) Stop() {
// If the notifier is not started, the cancel function is nil.
if n.cancel == nil {
return
}
n.cancel()
n.wg.Wait()
}
// OnBecomeOwner implements the owner.Listener interface.
// We need to make sure only one DDLNotifier is running at any time.
func (n *DDLNotifier) OnBecomeOwner() {
n.ctx, n.cancel = context.WithCancel(context.Background())
n.wg.RunWithRecover(n.start, func(r any) {
if r == nil {
return
}
// In unit tests, we want to panic directly to find the root cause.
if intest.InTest {
panic(r)
}
logutil.BgLogger().Error("panic in ddl notifier", zap.Any("recover", r), zap.Stack("stack"))
})
}
// OnRetireOwner implements the owner.Listener interface.
// After the owner is retired, we need to stop the DDLNotifier.
func (n *DDLNotifier) OnRetireOwner() {
n.Stop()
}

View File

@ -22,6 +22,7 @@ import (
"testing"
"time"
"github.com/ngaut/pools"
"github.com/pingcap/tidb/pkg/ddl"
"github.com/pingcap/tidb/pkg/ddl/notifier"
sess "github.com/pingcap/tidb/pkg/ddl/session"
@ -30,6 +31,7 @@ import (
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/testkit"
"github.com/pingcap/tidb/pkg/testkit/testfailpoint"
"github.com/pingcap/tidb/pkg/util"
"github.com/stretchr/testify/require"
)
@ -61,10 +63,17 @@ func TestBasicPubSub(t *testing.T) {
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
ctx, cancel := context.WithCancel(context.Background())
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
sessionPool := util.NewSessionPool(
1,
func() (pools.Resource, error) {
return tk.Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(tk.Session(), s, 50*time.Millisecond)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
var seenChangesMu sync.Mutex
seenChanges := make([]*notifier.SchemaChangeEvent, 0, 8)
@ -94,12 +103,13 @@ func TestBasicPubSub(t *testing.T) {
done := make(chan struct{})
go func() {
n.Start(ctx)
n.OnBecomeOwner()
close(done)
}()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
ctx := context.Background()
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
@ -119,8 +129,7 @@ func TestBasicPubSub(t *testing.T) {
require.Equal(t, event1, seenChanges[0])
require.Equal(t, event2, seenChanges[1])
require.Equal(t, event3, seenChanges[2])
cancel()
n.OnRetireOwner()
<-done
}
@ -131,10 +140,16 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName)
tk.MustExec(ddl.NotifierTableSQL)
ctx, cancel := context.WithCancel(context.Background())
s := notifier.OpenTableStore("test", ddl.NotifierTableName)
n := notifier.NewDDLNotifier(tk.Session(), s, 50*time.Millisecond)
sessionPool := util.NewSessionPool(
1,
func() (pools.Resource, error) {
return tk.Session(), nil
},
nil,
nil,
)
n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond)
newRndFailHandler := func() (notifier.SchemaChangeHandler, *[]int64) {
maxFail := 5
@ -166,13 +181,13 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
done := make(chan struct{})
go func() {
n.Start(ctx)
n.OnBecomeOwner()
close(done)
}()
tk2 := testkit.NewTestKit(t, store)
se := sess.NewSession(tk2.Session())
ctx := context.Background()
event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")})
err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s)
require.NoError(t, err)
@ -193,7 +208,7 @@ func TestDeliverOrderAndCleanup(t *testing.T) {
require.Equal(t, []int64{1000, 1001, 1002}, *id2)
require.Equal(t, []int64{1000, 1001, 1002}, *id3)
cancel()
n.OnRetireOwner()
<-done
}

View File

@ -57,7 +57,6 @@ go_library(
"//pkg/planner/core/metrics",
"//pkg/privilege/privileges",
"//pkg/resourcegroup/runaway",
"//pkg/session/types",
"//pkg/sessionctx",
"//pkg/sessionctx/sessionstates",
"//pkg/sessionctx/sysproctrack",

View File

@ -65,7 +65,6 @@ import (
metrics2 "github.com/pingcap/tidb/pkg/planner/core/metrics"
"github.com/pingcap/tidb/pkg/privilege/privileges"
"github.com/pingcap/tidb/pkg/resourcegroup/runaway"
sessiontypes "github.com/pingcap/tidb/pkg/session/types"
"github.com/pingcap/tidb/pkg/sessionctx"
"github.com/pingcap/tidb/pkg/sessionctx/sessionstates"
"github.com/pingcap/tidb/pkg/sessionctx/sysproctrack"
@ -1380,12 +1379,8 @@ func (do *Domain) Init(
var ddlNotifierStore notifier.Store
if intest.InTest {
ddlNotifierStore = notifier.OpenTableStore("mysql", ddl.NotifierTableName)
se, err := do.sysExecutorFactory(do)
if err != nil {
return err
}
do.ddlNotifier = notifier.NewDDLNotifier(
se.(sessiontypes.Session),
do.sysSessionPool,
ddlNotifierStore,
time.Second,
)
@ -1511,12 +1506,6 @@ func (do *Domain) Start() error {
return err
}
do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher()
if intest.InTest {
do.wg.Run(func() {
do.ddlNotifier.Start(do.ctx)
do.ddlNotifier.Close()
}, "ddlNotifier")
}
// Local store needs to get the change information for every DDL state in each session.
do.wg.Run(func() {
@ -2350,6 +2339,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err
variable.EnableStatsOwner = do.enableStatsOwner
variable.DisableStatsOwner = do.disableStatsOwner
do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey)
if intest.InTest {
do.statsOwner.SetListener(do.ddlNotifier)
}
do.wg.Run(func() {
do.indexUsageWorker()
}, "indexUsageWorker")