disttask/ddl: change memory quota for writer (#47251)

ref pingcap/tidb#45719
This commit is contained in:
EasonBall
2023-09-25 19:06:27 +08:00
committed by GitHub
parent 39e0a9a0d1
commit a3efafe2c2

View File

@ -45,6 +45,7 @@ import (
"github.com/pingcap/tidb/tablecodec"
"github.com/pingcap/tidb/util/chunk"
"github.com/pingcap/tidb/util/logutil"
"github.com/pingcap/tidb/util/memory"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
"golang.org/x/sync/errgroup"
@ -180,9 +181,14 @@ func NewWriteIndexToExternalStoragePipeline(ctx *OperatorCtx, store kv.Storage,
shareMu = bcctx.GetLocalBackend().GetMutex()
}
memTotal, err := memory.MemTotal()
if err != nil {
return nil, err
}
srcOp := NewTableScanTaskSource(ctx, store, tbl, startKey, endKey)
scanOp := NewTableScanOperator(ctx, sessPool, copCtx, srcChkPool, readerCnt)
writeOp := NewWriteExternalStoreOperator(ctx, copCtx, sessPool, jobID, subtaskID, tbl, index, extStore, srcChkPool, writerCnt, onClose, shareMu)
writeOp := NewWriteExternalStoreOperator(ctx, copCtx, sessPool, jobID, subtaskID, tbl, index, extStore, srcChkPool, writerCnt, onClose, shareMu, memTotal/2)
sinkOp := newIndexWriteResultSink(ctx, nil, tbl, index, totalRowCount, metricCounter)
operator.Compose[TableScanTask](srcOp, scanOp)
@ -449,7 +455,7 @@ type WriteExternalStoreOperator struct {
// NewWriteExternalStoreOperator creates a new WriteExternalStoreOperator.
func NewWriteExternalStoreOperator(ctx *OperatorCtx, copCtx *CopContext, sessPool opSessPool, jobID int64,
subtaskID int64, tbl table.PhysicalTable, index table.Index, store storage.ExternalStorage, srcChunkPool chan *chunk.Chunk,
concurrency int, onClose external.OnCloseFunc, shareMu *sync.Mutex) *WriteExternalStoreOperator {
concurrency int, onClose external.OnCloseFunc, shareMu *sync.Mutex, memoryQuota uint64) *WriteExternalStoreOperator {
pool := workerpool.NewWorkerPool(
"WriteExternalStoreOperator",
util.DDL,
@ -457,7 +463,9 @@ func NewWriteExternalStoreOperator(ctx *OperatorCtx, copCtx *CopContext, sessPoo
func() workerpool.Worker[IndexRecordChunk, IndexWriteResult] {
builder := external.NewWriterBuilder().
SetOnCloseFunc(onClose).
SetKeyDuplicationEncoding(index.Meta().Unique).SetMutex(shareMu)
SetKeyDuplicationEncoding(index.Meta().Unique).
SetMutex(shareMu).
SetMemorySizeLimit(memoryQuota)
writerID := uuid.New().String()
prefix := path.Join(strconv.Itoa(int(jobID)), strconv.Itoa(int(subtaskID)))
writer := builder.Build(store, prefix, writerID)