From 83c7a9622197abeed3a729d526fcb4e02269dc71 Mon Sep 17 00:00:00 2001 From: GMHDBJD <35025882+GMHDBJD@users.noreply.github.com> Date: Fri, 7 Jul 2023 18:29:11 +0800 Subject: [PATCH] import/add-index: fix race and performance regression (#45147) close pingcap/tidb#44584 --- br/pkg/lightning/backend/local/engine.go | 59 ++++++++++++++---------- 1 file changed, 34 insertions(+), 25 deletions(-) diff --git a/br/pkg/lightning/backend/local/engine.go b/br/pkg/lightning/backend/local/engine.go index c8aa6a31e4..2e0f274650 100644 --- a/br/pkg/lightning/backend/local/engine.go +++ b/br/pkg/lightning/backend/local/engine.go @@ -486,16 +486,7 @@ func (e *Engine) getEngineFileSize() backend.EngineFileSize { var memSize int64 e.localWriters.Range(func(k, v interface{}) bool { w := k.(*Writer) - w.Lock() - defer w.Unlock() - if w.writer != nil { - memSize += int64(w.writer.writer.EstimatedSize()) - } else { - // if kvs are still in memory, only calculate half of the total size - // in our tests, SST file size is about 50% of the raw kv size - memSize += w.batchSize / 2 - } - + memSize += int64(w.EstimatedSize()) return true }) @@ -1009,7 +1000,8 @@ type Writer struct { // if the KVs are append in order, we can directly write the into SST file, // else we must first store them in writeBatch and then batch flush into SST file. isKVSorted bool - writer *sstWriter + writer atomic.Pointer[sstWriter] + writerSize atomic.Uint64 // bytes buffer for writeBatch kvBuffer *membuf.Buffer @@ -1020,27 +1012,28 @@ type Writer struct { sortedKeyBuf []byte batchCount int - batchSize int64 + batchSize atomic.Int64 lastMetaSeq int32 tikvCodec tikv.Codec } -func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { - if w.writer == nil { - writer, err := w.createSSTWriter() +func (w *Writer) appendRowsSorted(kvs []common.KvPair) (err error) { + writer := w.writer.Load() + if writer == nil { + writer, err = w.createSSTWriter() if err != nil { return errors.Trace(err) } - w.writer = writer + w.writer.Store(writer) } keyAdapter := w.engine.keyAdapter totalKeySize := 0 for i := 0; i < len(kvs); i++ { keySize := keyAdapter.EncodedLen(kvs[i].Key, kvs[i].RowID) - w.batchSize += int64(keySize + len(kvs[i].Val)) + w.batchSize.Add(int64(keySize + len(kvs[i].Val))) totalKeySize += keySize } w.batchCount += len(kvs) @@ -1059,7 +1052,11 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error { } kvs = newKvs } - return w.writer.writeKVs(kvs) + if err := writer.writeKVs(kvs); err != nil { + return err + } + w.writerSize.Store(writer.writer.EstimatedSize()) + return nil } func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error { @@ -1075,7 +1072,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er w.isWriteBatchSorted = false } lastKey = pair.Key - w.batchSize += int64(len(pair.Key) + len(pair.Val)) + w.batchSize.Add(int64(len(pair.Key) + len(pair.Val))) buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID)) key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID) val := w.kvBuffer.AddBytes(pair.Val) @@ -1089,7 +1086,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er } w.batchCount = cnt - if w.batchSize > w.memtableSizeLimit { + if w.batchSize.Load() > w.memtableSizeLimit { if err := w.flushKVs(ctx); err != nil { return err } @@ -1116,7 +1113,7 @@ func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows enco defer w.Unlock() // if chunk has _tidb_rowid field, we can't ensure that the rows are sorted. - if w.isKVSorted && w.writer == nil { + if w.isKVSorted && w.writer.Load() == nil { for _, c := range columnNames { if c == model.ExtraHandleName.L { w.isKVSorted = false @@ -1143,12 +1140,14 @@ func (w *Writer) flush(ctx context.Context) error { } } - if w.writer != nil { - meta, err := w.writer.close() + writer := w.writer.Load() + if writer != nil { + meta, err := writer.close() if err != nil { return errors.Trace(err) } - w.writer = nil + w.writer.Store(nil) + w.writerSize.Store(0) w.batchCount = 0 if meta != nil && meta.totalSize > 0 { return w.addSST(ctx, meta) @@ -1158,6 +1157,16 @@ func (w *Writer) flush(ctx context.Context) error { return nil } +// EstimatedSize returns the estimated size of the SST file. +func (w *Writer) EstimatedSize() uint64 { + if size := w.writerSize.Load(); size > 0 { + return size + } + // if kvs are still in memory, only calculate half of the total size + // in our tests, SST file size is about 50% of the raw kv size + return uint64(w.batchSize.Load()) / 2 +} + type flushStatus struct { local *Engine seq int32 @@ -1219,7 +1228,7 @@ func (w *Writer) flushKVs(ctx context.Context) error { return errors.Trace(err) } - w.batchSize = 0 + w.batchSize.Store(0) w.batchCount = 0 w.kvBuffer.Reset() return nil