Files
tidb/pkg/util/chunk/row_container_reader.go
2024-10-28 07:48:39 +00:00

171 lines
4.4 KiB
Go

// Copyright 2023 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package chunk
import (
"context"
"runtime"
"sync"
"github.com/pingcap/errors"
"github.com/pingcap/failpoint"
"github.com/pingcap/tidb/pkg/util/logutil"
)
// RowContainerReader is a forward-only iterator for the row container. It provides an interface similar to other
// iterators, but it doesn't provide `ReachEnd` function and requires manually closing to release goroutine.
//
// It's recommended to use the following pattern to use it:
//
// for iter := NewRowContainerReader(rc); iter.Current() != iter.End(); iter.Next() {
// ...
// }
// iter.Close()
// if iter.Error() != nil {
// }
type RowContainerReader interface {
// Next returns the next Row.
Next() Row
// Current returns the current Row.
Current() Row
// End returns the invalid end Row.
End() Row
// Error returns none-nil error if anything wrong happens during the iteration.
Error() error
// Close closes the dumper
Close()
}
var _ RowContainerReader = &rowContainerReader{}
// rowContainerReader is a forward-only iterator for the row container
// It will spawn two goroutines for reading chunks from disk, and converting the chunk to rows. The row will only be sent
// to `rowCh` inside only after when the full chunk has been read, to avoid concurrently read/write to the chunk.
//
// TODO: record the memory allocated for the channel and chunks.
type rowContainerReader struct {
// context, cancel and waitgroup are used to stop and wait until all goroutine stops.
ctx context.Context
cancel func()
wg sync.WaitGroup
rc *RowContainer
currentRow Row
rowCh chan Row
// this error will only be set by worker
err error
}
// Next implements RowContainerReader
func (reader *rowContainerReader) Next() Row {
for row := range reader.rowCh {
reader.currentRow = row
return row
}
reader.currentRow = reader.End()
return reader.End()
}
// Current implements RowContainerReader
func (reader *rowContainerReader) Current() Row {
return reader.currentRow
}
// End implements RowContainerReader
func (*rowContainerReader) End() Row {
return Row{}
}
// Error implements RowContainerReader
func (reader *rowContainerReader) Error() error {
return reader.err
}
func (reader *rowContainerReader) initializeChannel() {
if reader.rc.NumChunks() == 0 {
reader.rowCh = make(chan Row, 1024)
} else {
assumeChunkSize := reader.rc.NumRowsOfChunk(0)
// To avoid blocking in sending to `rowCh` and don't start reading the next chunk, it'd be better to give it
// a buffer at least larger than a single chunk. Here it's allocated twice the chunk size to leave some margin.
reader.rowCh = make(chan Row, 2*assumeChunkSize)
}
}
// Close implements RowContainerReader
func (reader *rowContainerReader) Close() {
reader.cancel()
reader.wg.Wait()
}
func (reader *rowContainerReader) startWorker() {
reader.wg.Add(1)
go func() {
defer close(reader.rowCh)
defer reader.wg.Done()
for chkIdx := range reader.rc.NumChunks() {
chk, err := reader.rc.GetChunk(chkIdx)
failpoint.Inject("get-chunk-error", func(val failpoint.Value) {
if val.(bool) {
err = errors.New("fail to get chunk for test")
}
})
if err != nil {
reader.err = err
return
}
for i := range chk.NumRows() {
select {
case reader.rowCh <- chk.GetRow(i):
case <-reader.ctx.Done():
return
}
}
}
}()
}
// NewRowContainerReader creates a forward only iterator for row container
func NewRowContainerReader(rc *RowContainer) *rowContainerReader {
ctx, cancel := context.WithCancel(context.Background())
reader := &rowContainerReader{
ctx: ctx,
cancel: cancel,
wg: sync.WaitGroup{},
rc: rc,
}
reader.initializeChannel()
reader.startWorker()
reader.Next()
runtime.SetFinalizer(reader, func(reader *rowContainerReader) {
if reader.ctx.Err() == nil {
logutil.BgLogger().Warn("rowContainerReader is closed by finalizer")
reader.Close()
}
})
return reader
}