import/add-index: fix race and performance regression (#45147)
close pingcap/tidb#44584
This commit is contained in:
@ -486,16 +486,7 @@ func (e *Engine) getEngineFileSize() backend.EngineFileSize {
|
||||
var memSize int64
|
||||
e.localWriters.Range(func(k, v interface{}) bool {
|
||||
w := k.(*Writer)
|
||||
w.Lock()
|
||||
defer w.Unlock()
|
||||
if w.writer != nil {
|
||||
memSize += int64(w.writer.writer.EstimatedSize())
|
||||
} else {
|
||||
// if kvs are still in memory, only calculate half of the total size
|
||||
// in our tests, SST file size is about 50% of the raw kv size
|
||||
memSize += w.batchSize / 2
|
||||
}
|
||||
|
||||
memSize += int64(w.EstimatedSize())
|
||||
return true
|
||||
})
|
||||
|
||||
@ -1009,7 +1000,8 @@ type Writer struct {
|
||||
// if the KVs are append in order, we can directly write the into SST file,
|
||||
// else we must first store them in writeBatch and then batch flush into SST file.
|
||||
isKVSorted bool
|
||||
writer *sstWriter
|
||||
writer atomic.Pointer[sstWriter]
|
||||
writerSize atomic.Uint64
|
||||
|
||||
// bytes buffer for writeBatch
|
||||
kvBuffer *membuf.Buffer
|
||||
@ -1020,27 +1012,28 @@ type Writer struct {
|
||||
sortedKeyBuf []byte
|
||||
|
||||
batchCount int
|
||||
batchSize int64
|
||||
batchSize atomic.Int64
|
||||
|
||||
lastMetaSeq int32
|
||||
|
||||
tikvCodec tikv.Codec
|
||||
}
|
||||
|
||||
func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
|
||||
if w.writer == nil {
|
||||
writer, err := w.createSSTWriter()
|
||||
func (w *Writer) appendRowsSorted(kvs []common.KvPair) (err error) {
|
||||
writer := w.writer.Load()
|
||||
if writer == nil {
|
||||
writer, err = w.createSSTWriter()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
w.writer = writer
|
||||
w.writer.Store(writer)
|
||||
}
|
||||
|
||||
keyAdapter := w.engine.keyAdapter
|
||||
totalKeySize := 0
|
||||
for i := 0; i < len(kvs); i++ {
|
||||
keySize := keyAdapter.EncodedLen(kvs[i].Key, kvs[i].RowID)
|
||||
w.batchSize += int64(keySize + len(kvs[i].Val))
|
||||
w.batchSize.Add(int64(keySize + len(kvs[i].Val)))
|
||||
totalKeySize += keySize
|
||||
}
|
||||
w.batchCount += len(kvs)
|
||||
@ -1059,7 +1052,11 @@ func (w *Writer) appendRowsSorted(kvs []common.KvPair) error {
|
||||
}
|
||||
kvs = newKvs
|
||||
}
|
||||
return w.writer.writeKVs(kvs)
|
||||
if err := writer.writeKVs(kvs); err != nil {
|
||||
return err
|
||||
}
|
||||
w.writerSize.Store(writer.writer.EstimatedSize())
|
||||
return nil
|
||||
}
|
||||
|
||||
func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) error {
|
||||
@ -1075,7 +1072,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
|
||||
w.isWriteBatchSorted = false
|
||||
}
|
||||
lastKey = pair.Key
|
||||
w.batchSize += int64(len(pair.Key) + len(pair.Val))
|
||||
w.batchSize.Add(int64(len(pair.Key) + len(pair.Val)))
|
||||
buf := w.kvBuffer.AllocBytes(keyAdapter.EncodedLen(pair.Key, pair.RowID))
|
||||
key := keyAdapter.Encode(buf[:0], pair.Key, pair.RowID)
|
||||
val := w.kvBuffer.AddBytes(pair.Val)
|
||||
@ -1089,7 +1086,7 @@ func (w *Writer) appendRowsUnsorted(ctx context.Context, kvs []common.KvPair) er
|
||||
}
|
||||
w.batchCount = cnt
|
||||
|
||||
if w.batchSize > w.memtableSizeLimit {
|
||||
if w.batchSize.Load() > w.memtableSizeLimit {
|
||||
if err := w.flushKVs(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -1116,7 +1113,7 @@ func (w *Writer) AppendRows(ctx context.Context, columnNames []string, rows enco
|
||||
defer w.Unlock()
|
||||
|
||||
// if chunk has _tidb_rowid field, we can't ensure that the rows are sorted.
|
||||
if w.isKVSorted && w.writer == nil {
|
||||
if w.isKVSorted && w.writer.Load() == nil {
|
||||
for _, c := range columnNames {
|
||||
if c == model.ExtraHandleName.L {
|
||||
w.isKVSorted = false
|
||||
@ -1143,12 +1140,14 @@ func (w *Writer) flush(ctx context.Context) error {
|
||||
}
|
||||
}
|
||||
|
||||
if w.writer != nil {
|
||||
meta, err := w.writer.close()
|
||||
writer := w.writer.Load()
|
||||
if writer != nil {
|
||||
meta, err := writer.close()
|
||||
if err != nil {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
w.writer = nil
|
||||
w.writer.Store(nil)
|
||||
w.writerSize.Store(0)
|
||||
w.batchCount = 0
|
||||
if meta != nil && meta.totalSize > 0 {
|
||||
return w.addSST(ctx, meta)
|
||||
@ -1158,6 +1157,16 @@ func (w *Writer) flush(ctx context.Context) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// EstimatedSize returns the estimated size of the SST file.
|
||||
func (w *Writer) EstimatedSize() uint64 {
|
||||
if size := w.writerSize.Load(); size > 0 {
|
||||
return size
|
||||
}
|
||||
// if kvs are still in memory, only calculate half of the total size
|
||||
// in our tests, SST file size is about 50% of the raw kv size
|
||||
return uint64(w.batchSize.Load()) / 2
|
||||
}
|
||||
|
||||
type flushStatus struct {
|
||||
local *Engine
|
||||
seq int32
|
||||
@ -1219,7 +1228,7 @@ func (w *Writer) flushKVs(ctx context.Context) error {
|
||||
return errors.Trace(err)
|
||||
}
|
||||
|
||||
w.batchSize = 0
|
||||
w.batchSize.Store(0)
|
||||
w.batchCount = 0
|
||||
w.kvBuffer.Reset()
|
||||
return nil
|
||||
|
||||
Reference in New Issue
Block a user