diff --git a/pkg/ddl/notifier/BUILD.bazel b/pkg/ddl/notifier/BUILD.bazel index f2a899dd1e..dfff723dc6 100644 --- a/pkg/ddl/notifier/BUILD.bazel +++ b/pkg/ddl/notifier/BUILD.bazel @@ -14,8 +14,9 @@ go_library( "//pkg/ddl/session", "//pkg/kv", "//pkg/meta/model", - "//pkg/session/types", + "//pkg/owner", "//pkg/sessionctx", + "//pkg/util", "//pkg/util/intest", "//pkg/util/logutil", "@com_github_pingcap_errors//:errors", @@ -41,6 +42,8 @@ go_test( "//pkg/sessionctx", "//pkg/testkit", "//pkg/testkit/testfailpoint", + "//pkg/util", + "@com_github_ngaut_pools//:pools", "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ddl/notifier/subscribe.go b/pkg/ddl/notifier/subscribe.go index cfe9fa949f..2d4569302e 100644 --- a/pkg/ddl/notifier/subscribe.go +++ b/pkg/ddl/notifier/subscribe.go @@ -23,8 +23,9 @@ import ( "github.com/pingcap/errors" sess "github.com/pingcap/tidb/pkg/ddl/session" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/session/types" + "github.com/pingcap/tidb/pkg/owner" "github.com/pingcap/tidb/pkg/sessionctx" + "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" "go.uber.org/zap" @@ -79,9 +80,23 @@ func (id HandlerID) String() string { } } +// Ensure DDLNotifier implements the owner.Listener interface. +// The DDLNotifier is started only when the stats owner is elected to ensure consistency. +// This design is crucial because: +// 1. The stats handler(priority queue) processes DDLNotifier events in memory. +// 2. Keeping the stats handler and DDLNotifier on the same node maintains data integrity. +// 3. It prevents potential race conditions or inconsistencies that could arise from +// distributed processing of these events across multiple nodes. +var _ owner.Listener = (*DDLNotifier)(nil) + // DDLNotifier implements the subscription on DDL events. type DDLNotifier struct { - ownedSess types.Session + // The context is initialized in Start and canceled in Stop and Close. + ctx context.Context + cancel context.CancelFunc + wg util.WaitGroupWrapper + sysSessionPool util.SessionPool + store Store handlers map[HandlerID]SchemaChangeHandler pollInterval time.Duration @@ -90,19 +105,17 @@ type DDLNotifier struct { handlersBitMap uint64 } -// NewDDLNotifier initializes the global DDLNotifier. It should be called only -// once and before any RegisterHandler call. The ownership of the sctx is passed -// to the DDLNotifier. +// NewDDLNotifier initializes the global DDLNotifier. func NewDDLNotifier( - sess types.Session, + sysSessionPool util.SessionPool, store Store, pollInterval time.Duration, ) *DDLNotifier { return &DDLNotifier{ - ownedSess: sess, - store: store, - handlers: make(map[HandlerID]SchemaChangeHandler), - pollInterval: pollInterval, + sysSessionPool: sysSessionPool, + store: store, + handlers: make(map[HandlerID]SchemaChangeHandler), + pollInterval: pollInterval, } } @@ -125,13 +138,14 @@ func (n *DDLNotifier) RegisterHandler(id HandlerID, handler SchemaChangeHandler) n.handlers[id] = handler } -// Start starts the DDLNotifier. It will block until the context is canceled. -func (n *DDLNotifier) Start(ctx context.Context) { +// start starts the DDLNotifier. It will block until the context is canceled. +// Do not call this function directly. Use owner.Listener interface instead. +func (n *DDLNotifier) start() { for id := range n.handlers { n.handlersBitMap |= 1 << id } - ctx = kv.WithInternalSourceType(ctx, kv.InternalDDLNotifier) + ctx := kv.WithInternalSourceType(n.ctx, kv.InternalDDLNotifier) ctx = logutil.WithCategory(ctx, "ddl-notifier") ticker := time.NewTicker(n.pollInterval) defer ticker.Stop() @@ -148,7 +162,14 @@ func (n *DDLNotifier) Start(ctx context.Context) { } func (n *DDLNotifier) processEvents(ctx context.Context) error { - changes, err := n.store.List(ctx, sess.NewSession(n.ownedSess)) + sysSession, err := n.sysSessionPool.Get() + if err != nil { + return errors.Trace(err) + } + defer n.sysSessionPool.Put(sysSession) + + session := sess.NewSession(sysSession.(sessionctx.Context)) + changes, err := n.store.List(ctx, session) if err != nil { return errors.Trace(err) } @@ -161,7 +182,7 @@ func (n *DDLNotifier) processEvents(ctx context.Context) error { if _, ok := skipHandlers[handlerID]; ok { continue } - if err2 := n.processEventForHandler(ctx, change, handlerID, handler); err2 != nil { + if err2 := n.processEventForHandler(ctx, session, change, handlerID, handler); err2 != nil { skipHandlers[handlerID] = struct{}{} if !goerr.Is(err2, ErrNotReadyRetryLater) { @@ -187,7 +208,7 @@ func (n *DDLNotifier) processEvents(ctx context.Context) error { if change.processedByFlag == n.handlersBitMap { if err2 := n.store.DeleteAndCommit( ctx, - sess.NewSession(n.ownedSess), + session, change.ddlJobID, int(change.multiSchemaChangeSeq), ); err2 != nil { @@ -206,6 +227,7 @@ const slowHandlerLogThreshold = time.Second * 5 func (n *DDLNotifier) processEventForHandler( ctx context.Context, + session *sess.Session, change *schemaChange, handlerID HandlerID, handler SchemaChangeHandler, @@ -214,21 +236,19 @@ func (n *DDLNotifier) processEventForHandler( return nil } - se := sess.NewSession(n.ownedSess) - - if err = se.Begin(ctx); err != nil { + if err = session.Begin(ctx); err != nil { return errors.Trace(err) } defer func() { if err == nil { - err = errors.Trace(se.Commit(ctx)) + err = errors.Trace(session.Commit(ctx)) } else { - se.Rollback() + session.Rollback() } }() now := time.Now() - if err = handler(ctx, n.ownedSess, change.event); err != nil { + if err = handler(ctx, session.Context, change.event); err != nil { return errors.Trace(err) } if time.Since(now) > slowHandlerLogThreshold { @@ -243,7 +263,7 @@ func (n *DDLNotifier) processEventForHandler( newFlag := change.processedByFlag | (1 << handlerID) if err = n.store.UpdateProcessed( ctx, - se, + session, change.ddlJobID, change.multiSchemaChangeSeq, newFlag, @@ -255,7 +275,36 @@ func (n *DDLNotifier) processEventForHandler( return nil } -// Close releases the resources. -func (n *DDLNotifier) Close() { - n.ownedSess.Close() +// Stop stops the background loop. +// Exposed for testing. +// Do not call this function directly. Use owner.Listener interface instead. +func (n *DDLNotifier) Stop() { + // If the notifier is not started, the cancel function is nil. + if n.cancel == nil { + return + } + n.cancel() + n.wg.Wait() +} + +// OnBecomeOwner implements the owner.Listener interface. +// We need to make sure only one DDLNotifier is running at any time. +func (n *DDLNotifier) OnBecomeOwner() { + n.ctx, n.cancel = context.WithCancel(context.Background()) + n.wg.RunWithRecover(n.start, func(r any) { + if r == nil { + return + } + // In unit tests, we want to panic directly to find the root cause. + if intest.InTest { + panic(r) + } + logutil.BgLogger().Error("panic in ddl notifier", zap.Any("recover", r), zap.Stack("stack")) + }) +} + +// OnRetireOwner implements the owner.Listener interface. +// After the owner is retired, we need to stop the DDLNotifier. +func (n *DDLNotifier) OnRetireOwner() { + n.Stop() } diff --git a/pkg/ddl/notifier/testkit_test.go b/pkg/ddl/notifier/testkit_test.go index 8f19af12d4..cae71be7fb 100644 --- a/pkg/ddl/notifier/testkit_test.go +++ b/pkg/ddl/notifier/testkit_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/ngaut/pools" "github.com/pingcap/tidb/pkg/ddl" "github.com/pingcap/tidb/pkg/ddl/notifier" sess "github.com/pingcap/tidb/pkg/ddl/session" @@ -30,6 +31,7 @@ import ( "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/testkit/testfailpoint" + "github.com/pingcap/tidb/pkg/util" "github.com/stretchr/testify/require" ) @@ -61,10 +63,17 @@ func TestBasicPubSub(t *testing.T) { tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName) tk.MustExec(ddl.NotifierTableSQL) - ctx, cancel := context.WithCancel(context.Background()) s := notifier.OpenTableStore("test", ddl.NotifierTableName) + sessionPool := util.NewSessionPool( + 1, + func() (pools.Resource, error) { + return tk.Session(), nil + }, + nil, + nil, + ) - n := notifier.NewDDLNotifier(tk.Session(), s, 50*time.Millisecond) + n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond) var seenChangesMu sync.Mutex seenChanges := make([]*notifier.SchemaChangeEvent, 0, 8) @@ -94,12 +103,13 @@ func TestBasicPubSub(t *testing.T) { done := make(chan struct{}) go func() { - n.Start(ctx) + n.OnBecomeOwner() close(done) }() tk2 := testkit.NewTestKit(t, store) se := sess.NewSession(tk2.Session()) + ctx := context.Background() event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")}) err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s) require.NoError(t, err) @@ -119,8 +129,7 @@ func TestBasicPubSub(t *testing.T) { require.Equal(t, event1, seenChanges[0]) require.Equal(t, event2, seenChanges[1]) require.Equal(t, event3, seenChanges[2]) - - cancel() + n.OnRetireOwner() <-done } @@ -131,10 +140,16 @@ func TestDeliverOrderAndCleanup(t *testing.T) { tk.MustExec("DROP TABLE IF EXISTS " + ddl.NotifierTableName) tk.MustExec(ddl.NotifierTableSQL) - ctx, cancel := context.WithCancel(context.Background()) s := notifier.OpenTableStore("test", ddl.NotifierTableName) - - n := notifier.NewDDLNotifier(tk.Session(), s, 50*time.Millisecond) + sessionPool := util.NewSessionPool( + 1, + func() (pools.Resource, error) { + return tk.Session(), nil + }, + nil, + nil, + ) + n := notifier.NewDDLNotifier(sessionPool, s, 50*time.Millisecond) newRndFailHandler := func() (notifier.SchemaChangeHandler, *[]int64) { maxFail := 5 @@ -166,13 +181,13 @@ func TestDeliverOrderAndCleanup(t *testing.T) { done := make(chan struct{}) go func() { - n.Start(ctx) + n.OnBecomeOwner() close(done) }() tk2 := testkit.NewTestKit(t, store) se := sess.NewSession(tk2.Session()) - + ctx := context.Background() event1 := notifier.NewCreateTableEvent(&model.TableInfo{ID: 1000, Name: pmodel.NewCIStr("t1")}) err := notifier.PubSchemeChangeToStore(ctx, se, 1, -1, event1, s) require.NoError(t, err) @@ -193,7 +208,7 @@ func TestDeliverOrderAndCleanup(t *testing.T) { require.Equal(t, []int64{1000, 1001, 1002}, *id2) require.Equal(t, []int64{1000, 1001, 1002}, *id3) - cancel() + n.OnRetireOwner() <-done } diff --git a/pkg/domain/BUILD.bazel b/pkg/domain/BUILD.bazel index 385e40fa54..296bacaf25 100644 --- a/pkg/domain/BUILD.bazel +++ b/pkg/domain/BUILD.bazel @@ -57,7 +57,6 @@ go_library( "//pkg/planner/core/metrics", "//pkg/privilege/privileges", "//pkg/resourcegroup/runaway", - "//pkg/session/types", "//pkg/sessionctx", "//pkg/sessionctx/sessionstates", "//pkg/sessionctx/sysproctrack", diff --git a/pkg/domain/domain.go b/pkg/domain/domain.go index 9245ade667..8360d308a3 100644 --- a/pkg/domain/domain.go +++ b/pkg/domain/domain.go @@ -65,7 +65,6 @@ import ( metrics2 "github.com/pingcap/tidb/pkg/planner/core/metrics" "github.com/pingcap/tidb/pkg/privilege/privileges" "github.com/pingcap/tidb/pkg/resourcegroup/runaway" - sessiontypes "github.com/pingcap/tidb/pkg/session/types" "github.com/pingcap/tidb/pkg/sessionctx" "github.com/pingcap/tidb/pkg/sessionctx/sessionstates" "github.com/pingcap/tidb/pkg/sessionctx/sysproctrack" @@ -1380,12 +1379,8 @@ func (do *Domain) Init( var ddlNotifierStore notifier.Store if intest.InTest { ddlNotifierStore = notifier.OpenTableStore("mysql", ddl.NotifierTableName) - se, err := do.sysExecutorFactory(do) - if err != nil { - return err - } do.ddlNotifier = notifier.NewDDLNotifier( - se.(sessiontypes.Session), + do.sysSessionPool, ddlNotifierStore, time.Second, ) @@ -1511,12 +1506,6 @@ func (do *Domain) Start() error { return err } do.minJobIDRefresher = do.ddl.GetMinJobIDRefresher() - if intest.InTest { - do.wg.Run(func() { - do.ddlNotifier.Start(do.ctx) - do.ddlNotifier.Close() - }, "ddlNotifier") - } // Local store needs to get the change information for every DDL state in each session. do.wg.Run(func() { @@ -2350,6 +2339,9 @@ func (do *Domain) UpdateTableStatsLoop(ctx, initStatsCtx sessionctx.Context) err variable.EnableStatsOwner = do.enableStatsOwner variable.DisableStatsOwner = do.disableStatsOwner do.statsOwner = do.newOwnerManager(handle.StatsPrompt, handle.StatsOwnerKey) + if intest.InTest { + do.statsOwner.SetListener(do.ddlNotifier) + } do.wg.Run(func() { do.indexUsageWorker() }, "indexUsageWorker")