From ee4eac2ccb83e1ea653b8131d9a43495019cb5ac Mon Sep 17 00:00:00 2001 From: fzzf678 <108643977+fzzf678@users.noreply.github.com> Date: Tue, 21 Oct 2025 11:19:49 +0800 Subject: [PATCH] global sort: update the part size calculate formula to avoid upload multipart failed (#63918) close pingcap/tidb#63781 --- pkg/ddl/backfilling_merge_sort.go | 2 +- pkg/disttask/importinto/task_executor.go | 4 +-- .../backend/external/onefile_writer.go | 26 ++++++++++++++++--- 3 files changed, 26 insertions(+), 6 deletions(-) diff --git a/pkg/ddl/backfilling_merge_sort.go b/pkg/ddl/backfilling_merge_sort.go index 3e0532e45a..e3ddec39d3 100644 --- a/pkg/ddl/backfilling_merge_sort.go +++ b/pkg/ddl/backfilling_merge_sort.go @@ -107,7 +107,7 @@ func (m *mergeSortExecutor) RunSubtask(ctx context.Context, subtask *proto.Subta prefix := path.Join(strconv.Itoa(int(subtask.TaskID)), strconv.Itoa(int(subtask.ID))) res := m.GetResource() memSizePerCon := res.Mem.Capacity() / res.CPU.Capacity() - partSize := max(external.MinUploadPartSize, memSizePerCon*int64(external.MaxMergingFilesPerThread)/10000) + partSize := max(external.MinUploadPartSize, memSizePerCon*int64(external.MaxMergingFilesPerThread)/external.MaxUploadPartCount) err = external.MergeOverlappingFiles( ctx, diff --git a/pkg/disttask/importinto/task_executor.go b/pkg/disttask/importinto/task_executor.go index 45075de45a..4a57bbfd00 100644 --- a/pkg/disttask/importinto/task_executor.go +++ b/pkg/disttask/importinto/task_executor.go @@ -380,8 +380,8 @@ func (m *mergeSortStepExecutor) Init(ctx context.Context) error { } m.sortStore = store dataKVMemSizePerCon, perIndexKVMemSizePerCon := getWriterMemorySizeLimit(m.GetResource(), &m.taskMeta.Plan) - m.dataKVPartSize = max(external.MinUploadPartSize, int64(dataKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000)) - m.indexKVPartSize = max(external.MinUploadPartSize, int64(perIndexKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/10000)) + m.dataKVPartSize = max(external.MinUploadPartSize, int64(dataKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/external.MaxUploadPartCount)) + m.indexKVPartSize = max(external.MinUploadPartSize, int64(perIndexKVMemSizePerCon*uint64(external.MaxMergingFilesPerThread)/external.MaxUploadPartCount)) m.logger.Info("merge sort partSize", zap.String("data-kv", units.BytesSize(float64(m.dataKVPartSize))), diff --git a/pkg/lightning/backend/external/onefile_writer.go b/pkg/lightning/backend/external/onefile_writer.go index 4705b32b74..df84d2ded6 100644 --- a/pkg/lightning/backend/external/onefile_writer.go +++ b/pkg/lightning/backend/external/onefile_writer.go @@ -46,6 +46,14 @@ var ( DefaultOneWriterBlockSize = int(defaultOneWriterMemSizeLimit) ) +const ( + // MaxUploadPartCount defines the divisor used when calculating the size of each uploaded part. + // Setting it from 10000 to 5000 increases the part size so that the total number of parts stays well below + // the S3 multipart upload limit of 10,000 parts, to avoiding the error "TotalPartsExceeded: exceeded total allowed configured MaxUploadParts (10000)". + MaxUploadPartCount = 5000 + logPartNumInterval = 999 // log the part num every 999 parts. +) + // OneFileWriter is used to write data into external storage // with only one file for data and stat. type OneFileWriter struct { @@ -88,9 +96,10 @@ type OneFileWriter struct { minKey []byte maxKey []byte - logger *zap.Logger - partSize int64 - writtenBytes int64 + logger *zap.Logger + partSize int64 + writtenBytes int64 + lastLogWriteSize uint64 } // lazyInitWriter inits the underlying dataFile/statFile path, dataWriter/statWriter @@ -160,6 +169,17 @@ func (w *OneFileWriter) InitPartSizeAndLogger(ctx context.Context, partSize int6 // WriteRow implements ingest.Writer. func (w *OneFileWriter) WriteRow(ctx context.Context, idxKey, idxVal []byte) error { + defer func() { + if (w.totalSize-w.lastLogWriteSize)/uint64(w.partSize) >= logPartNumInterval { + w.logger.Info("one file writer progress", + zap.String("writerID", w.writerID), + zap.Int64("partSize", w.partSize), + zap.Uint64("totalSize", w.totalSize), + zap.Uint64("estimatePartNum", w.totalSize/uint64(w.partSize)), + ) + w.lastLogWriteSize = w.totalSize + } + }() if w.onDup != engineapi.OnDuplicateKeyIgnore { // must be Record or Remove right now return w.handleDupAndWrite(ctx, idxKey, idxVal)