Files
tidb/pkg/dxf/operator/wrapper.go

142 lines
3.4 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package operator
import (
"fmt"
"github.com/pingcap/tidb/pkg/resourcemanager/pool/workerpool"
"golang.org/x/sync/errgroup"
)
// SimpleDataSource is a simple operator which use the given input slice as the data source.
type SimpleDataSource[T workerpool.TaskMayPanic] struct {
ctx *workerpool.Context
errGroup *errgroup.Group
inputs []T
target DataChannel[T]
}
// NewSimpleDataSource creates a new SimpleOperator with the given inputs.
// The input workerpool.Context is used to quit this operator.
// By using the same context as the downstream operators, we can ensure that
// this operator will quit when other operators encounter an error or panic.
func NewSimpleDataSource[T workerpool.TaskMayPanic](
ctx *workerpool.Context,
inputs []T,
) *SimpleDataSource[T] {
return &SimpleDataSource[T]{
inputs: inputs,
errGroup: &errgroup.Group{},
ctx: ctx,
}
}
// Open implements the Operator interface.
func (s *SimpleDataSource[T]) Open() error {
s.errGroup.Go(func() error {
defer s.target.Finish()
for _, input := range s.inputs {
select {
case s.target.Channel() <- input:
case <-s.ctx.Done():
return s.ctx.Err()
}
}
return nil
})
return nil
}
// Close implements the Operator interface.
func (s *SimpleDataSource[T]) Close() error {
return s.errGroup.Wait()
}
// String implements the Operator interface.
func (*SimpleDataSource[T]) String() string {
var zT T
return fmt.Sprintf("SimpleDataSource[%T]", zT)
}
// SetSink implements the WithSink interface.
func (s *SimpleDataSource[T]) SetSink(ch DataChannel[T]) {
s.target = ch
}
type simpleSink[R any] struct {
ctx *workerpool.Context
errGroup errgroup.Group
drainer func(R)
source DataChannel[R]
}
func newSimpleSink[R any](ctx *workerpool.Context, drainer func(R)) *simpleSink[R] {
return &simpleSink[R]{
ctx: ctx,
drainer: drainer,
}
}
func (s *simpleSink[R]) Open() error {
s.errGroup.Go(func() error {
for {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case data, ok := <-s.source.Channel():
if !ok {
return nil
}
s.drainer(data)
}
}
})
return nil
}
func (s *simpleSink[R]) Close() error {
return s.errGroup.Wait()
}
func (s *simpleSink[T]) SetSource(ch DataChannel[T]) {
s.source = ch
}
func (*simpleSink[R]) String() string {
return "simpleSink"
}
type simpleOperator[T workerpool.TaskMayPanic, R any] struct {
*AsyncOperator[T, R]
}
func (s *simpleOperator[T, R]) String() string {
return fmt.Sprintf("simpleOperator(%s)", s.AsyncOperator.String())
}
func newSimpleOperator[T workerpool.TaskMayPanic, R any](
ctx *workerpool.Context,
transform func(task T) R,
concurrency int,
) *simpleOperator[T, R] {
asyncOp := NewAsyncOperatorWithTransform(ctx, "simple", concurrency, transform)
return &simpleOperator[T, R]{
AsyncOperator: asyncOp,
}
}