global sort: update the part size calculate formula to avoid upload multipart failed (#63918)
close pingcap/tidb#63781
This commit is contained in:
@ -107,7 +107,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta
|
||||
prefix := path.Join(strconv.Itoa(int(subtask.TaskID)), strconv.Itoa(int(subtask.ID)))
|
||||
res := m.GetResource()
|
||||
memSizePerCon := res.Mem.Capacity() / res.CPU.Capacity()
|
||||
partSize := max(external.MinUploadPartSize, memSizePerCon*int64(external.MaxMergingFilesPerThread)/10000)
|
||||
partSize := max(external.MinUploadPartSize, memSizePerCon*int64(external.MaxMergingFilesPerThread)/external.MaxUploadPartCount)
|
||||
|
||||
err = external.MergeOverlappingFiles(
|
||||
ctx,
|
||||
|
||||
@ -380,8 +380,8 @@ func (m *mergeSortStepExecutor) Init(ctx context.Context) error {
|
||||
}
|
||||
m.sortStore = store
|
||||
dataKVMemSizePerCon, perIndexKVMemSizePerCon := getWriterMemorySizeLimit(m.GetResource(), &m.taskMeta.Plan)
|
||||
m.dataKVPartSize = max(external.MinUploadPartSize, int64(dataKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000))
|
||||
m.indexKVPartSize = max(external.MinUploadPartSize, int64(perIndexKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000))
|
||||
m.dataKVPartSize = max(external.MinUploadPartSize, int64(dataKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/external.MaxUploadPartCount))
|
||||
m.indexKVPartSize = max(external.MinUploadPartSize, int64(perIndexKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/external.MaxUploadPartCount))
|
||||
|
||||
m.logger.Info("merge sort partSize",
|
||||
zap.String("data-kv", units.BytesSize(float64(m.dataKVPartSize))),
|
||||
|
||||
26
pkg/lightning/backend/external/onefile_writer.go
vendored
26
pkg/lightning/backend/external/onefile_writer.go
vendored
@ -46,6 +46,14 @@ var (
|
||||
DefaultOneWriterBlockSize = int(defaultOneWriterMemSizeLimit)
|
||||
)
|
||||
|
||||
const (
|
||||
// MaxUploadPartCount defines the divisor used when calculating the size of each uploaded part.
|
||||
// Setting it from 10000 to 5000 increases the part size so that the total number of parts stays well below
|
||||
// the S3 multipart upload limit of 10,000 parts, to avoiding the error "TotalPartsExceeded: exceeded total allowed configured MaxUploadParts (10000)".
|
||||
MaxUploadPartCount = 5000
|
||||
logPartNumInterval = 999 // log the part num every 999 parts.
|
||||
)
|
||||
|
||||
// OneFileWriter is used to write data into external storage
|
||||
// with only one file for data and stat.
|
||||
type OneFileWriter struct {
|
||||
@ -88,9 +96,10 @@ type OneFileWriter struct {
|
||||
minKey []byte
|
||||
maxKey []byte
|
||||
|
||||
logger *zap.Logger
|
||||
partSize int64
|
||||
writtenBytes int64
|
||||
logger *zap.Logger
|
||||
partSize int64
|
||||
writtenBytes int64
|
||||
lastLogWriteSize uint64
|
||||
}
|
||||
|
||||
// lazyInitWriter inits the underlying dataFile/statFile path, dataWriter/statWriter
|
||||
@ -160,6 +169,17 @@ func (w *OneFileWriter) InitPartSizeAndLogger(ctx context.Context, partSize int6
|
||||
|
||||
// WriteRow implements ingest.Writer.
|
||||
func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) error {
|
||||
defer func() {
|
||||
if (w.totalSize-w.lastLogWriteSize)/uint64(w.partSize) >= logPartNumInterval {
|
||||
w.logger.Info("one file writer progress",
|
||||
zap.String("writerID", w.writerID),
|
||||
zap.Int64("partSize", w.partSize),
|
||||
zap.Uint64("totalSize", w.totalSize),
|
||||
zap.Uint64("estimatePartNum", w.totalSize/uint64(w.partSize)),
|
||||
)
|
||||
w.lastLogWriteSize = w.totalSize
|
||||
}
|
||||
}()
|
||||
if w.onDup != engineapi.OnDuplicateKeyIgnore {
|
||||
// must be Record or Remove right now
|
||||
return w.handleDupAndWrite(ctx, idxKey, idxVal)
|
||||
|
||||
Reference in New Issue
Block a user