diff --git a/disttask/framework/storage/task_table.go b/disttask/framework/storage/task_table.go index 88c6167f6a..c2ceafec6c 100644 --- a/disttask/framework/storage/task_table.go +++ b/disttask/framework/storage/task_table.go @@ -47,7 +47,12 @@ type SessionExecutor interface { // TaskManager is the manager of global/sub task. type TaskManager struct { ctx context.Context - sePool *pools.ResourcePool + sePool sessionPool +} + +type sessionPool interface { + Get() (pools.Resource, error) + Put(resource pools.Resource) } var _ SessionExecutor = &TaskManager{} @@ -60,7 +65,7 @@ var ( ) // NewTaskManager creates a new task manager. -func NewTaskManager(ctx context.Context, sePool *pools.ResourcePool) *TaskManager { +func NewTaskManager(ctx context.Context, sePool sessionPool) *TaskManager { ctx = util.WithInternalSourceType(ctx, kv.InternalDistTask) return &TaskManager{ ctx: ctx, diff --git a/domain/domain.go b/domain/domain.go index 61eb5a96f3..029431e196 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -186,7 +186,6 @@ type Domain struct { mdlCheckCh chan struct{} stopAutoAnalyze atomicutil.Bool - resourcePool *pools.ResourcePool } type mdlCheckTableInfo struct { @@ -1129,7 +1128,6 @@ func (do *Domain) Init( return sysExecutorFactory(do) } sysCtxPool := pools.NewResourcePool(sysFac, 128, 128, resourceIdleTimeout) - do.resourcePool = sysCtxPool ctx, cancelFunc := context.WithCancel(context.Background()) do.cancel = cancelFunc var callback ddl.Callback @@ -1655,7 +1653,7 @@ func (do *Domain) InitDistTaskLoop(ctx context.Context) error { } }) - taskManager := storage.NewTaskManager(ctx, do.resourcePool) + taskManager := storage.NewTaskManager(ctx, do.sysSessionPool) var serverID string if intest.InTest { do.InitInfo4Test()