diff --git a/kv/new_memdb_arena.go b/kv/new_memdb_arena.go index 762d30ceef..46c83d1d09 100644 --- a/kv/new_memdb_arena.go +++ b/kv/new_memdb_arena.go @@ -16,6 +16,7 @@ package kv import ( "encoding/binary" "math" + "unsafe" ) var ( @@ -35,11 +36,13 @@ func (addr memdbArenaAddr) isNull() bool { // store and load is used by vlog, due to pointer in vlog is not aligned. func (addr memdbArenaAddr) store(dst []byte) { - panic("todo") + endian.PutUint32(dst, addr.idx) + endian.PutUint32(dst[4:], addr.off) } func (addr *memdbArenaAddr) load(src []byte) { - panic("todo") + addr.idx = endian.Uint32(src) + addr.off = endian.Uint32(src[4:]) } type memdbArena struct { @@ -48,19 +51,52 @@ type memdbArena struct { } func (a *memdbArena) alloc(size int, align bool) (memdbArenaAddr, []byte) { - panic("todo") + if size > maxBlockSize { + panic("alloc size is larger than max block size") + } + + if len(a.blocks) == 0 { + a.enlarge(size, initBlockSize) + } + + addr, data := a.allocInLastBlock(size, align) + if !addr.isNull() { + return addr, data + } + + a.enlarge(size, a.blockSize<<1) + return a.allocInLastBlock(size, align) } func (a *memdbArena) enlarge(allocSize, blockSize int) { - panic("todo") + a.blockSize = blockSize + for a.blockSize <= allocSize { + a.blockSize <<= 1 + } + // Size will never larger than maxBlockSize. + if a.blockSize > maxBlockSize { + a.blockSize = maxBlockSize + } + a.blocks = append(a.blocks, memdbArenaBlock{ + buf: make([]byte, a.blockSize), + }) } func (a *memdbArena) allocInLastBlock(size int, align bool) (memdbArenaAddr, []byte) { - panic("todo") + idx := len(a.blocks) - 1 + offset, data := a.blocks[idx].alloc(size, align) + if offset == nullBlockOffset { + return nullAddr, nil + } + return memdbArenaAddr{uint32(idx), offset}, data } func (a *memdbArena) reset() { - panic("todo") + for i := range a.blocks { + a.blocks[i].reset() + } + a.blocks = a.blocks[:0] + a.blockSize = 0 } type memdbArenaBlock struct { @@ -69,11 +105,23 @@ type memdbArenaBlock struct { } func (a *memdbArenaBlock) alloc(size int, align bool) (uint32, []byte) { - panic("todo") + offset := a.length + if align { + // We must align the allocated address for node + // to make runtime.checkptrAlignment happy. + offset = (a.length + 7) & alignMask + } + newLen := offset + size + if newLen > len(a.buf) { + return nullBlockOffset, nil + } + a.length = newLen + return uint32(offset), a.buf[offset : offset+size] } func (a *memdbArenaBlock) reset() { - panic("todo") + a.buf = nil + a.length = 0 } type memdbCheckpoint struct { @@ -83,15 +131,29 @@ type memdbCheckpoint struct { } func (cp *memdbCheckpoint) isSamePosition(other *memdbCheckpoint) bool { - panic("todo") + return cp.blocks == other.blocks && cp.offsetInBlock == other.offsetInBlock } func (a *memdbArena) checkpoint() memdbCheckpoint { - panic("todo") + snap := memdbCheckpoint{ + blockSize: a.blockSize, + blocks: len(a.blocks), + } + if len(a.blocks) > 0 { + snap.offsetInBlock = a.blocks[len(a.blocks)-1].length + } + return snap } func (a *memdbArena) truncate(snap *memdbCheckpoint) { - panic("todo") + for i := snap.blocks; i < len(a.blocks); i++ { + a.blocks[i] = memdbArenaBlock{} + } + a.blocks = a.blocks[:snap.blocks] + if len(a.blocks) > 0 { + a.blocks[len(a.blocks)-1].length = snap.offsetInBlock + } + a.blockSize = snap.blockSize } type nodeAllocator struct { @@ -104,7 +166,12 @@ type nodeAllocator struct { } func (a *nodeAllocator) init() { - panic("todo") + a.nullNode = memdbNode{ + up: nullAddr, + left: nullAddr, + right: nullAddr, + vptr: nullAddr, + } } func (a *nodeAllocator) getNode(addr memdbArenaAddr) *memdbNode { @@ -112,17 +179,39 @@ func (a *nodeAllocator) getNode(addr memdbArenaAddr) *memdbNode { return &a.nullNode } - panic("todo") + return (*memdbNode)(unsafe.Pointer(&a.blocks[addr.idx].buf[addr.off])) } func (a *nodeAllocator) allocNode(key Key) (memdbArenaAddr, *memdbNode) { - panic("todo") + nodeSize := 8*4 + 2 + 1 + len(key) + addr, mem := a.alloc(nodeSize, true) + n := (*memdbNode)(unsafe.Pointer(&mem[0])) + n.vptr = nullAddr + n.klen = uint16(len(key)) + copy(n.getKey(), key) + return addr, n } -func (a *nodeAllocator) freeNode(addr memdbArenaAddr) {} +var testMode = false + +func (a *nodeAllocator) freeNode(addr memdbArenaAddr) { + if testMode { + // Make it easier for debug. + n := a.getNode(addr) + badAddr := nullAddr + badAddr.idx-- + n.left = badAddr + n.right = badAddr + n.up = badAddr + n.vptr = badAddr + return + } + // TODO: reuse freed nodes. +} func (a *nodeAllocator) reset() { - panic("todo") + a.memdbArena.reset() + a.init() } type memdbVlog struct { @@ -137,22 +226,117 @@ type memdbVlogHdr struct { valueLen uint32 } +func (hdr *memdbVlogHdr) store(dst []byte) { + cursor := 0 + endian.PutUint32(dst[cursor:], hdr.valueLen) + cursor += 4 + hdr.oldValue.store(dst[cursor:]) + cursor += 8 + hdr.nodeAddr.store(dst[cursor:]) +} + +func (hdr *memdbVlogHdr) load(src []byte) { + cursor := 0 + hdr.valueLen = endian.Uint32(src[cursor:]) + cursor += 4 + hdr.oldValue.load(src[cursor:]) + cursor += 8 + hdr.nodeAddr.load(src[cursor:]) +} + func (l *memdbVlog) appendValue(nodeAddr memdbArenaAddr, oldValue memdbArenaAddr, value []byte) memdbArenaAddr { - panic("todo") + size := memdbVlogHdrSize + len(value) + addr, mem := l.alloc(size, false) + + copy(mem, value) + hdr := memdbVlogHdr{nodeAddr, oldValue, uint32(len(value))} + hdr.store(mem[len(value):]) + + addr.off += uint32(size) + return addr } func (l *memdbVlog) getValue(addr memdbArenaAddr) []byte { - panic("todo") + lenOff := addr.off - memdbVlogHdrSize + block := l.blocks[addr.idx].buf + valueLen := endian.Uint32(block[lenOff:]) + if valueLen == 0 { + return tombstone + } + valueOff := lenOff - valueLen + return block[valueOff:lenOff] } func (l *memdbVlog) revertToCheckpoint(db *memdb, cp *memdbCheckpoint) { - panic("todo") + cursor := l.checkpoint() + for !cp.isSamePosition(&cursor) { + hdrOff := cursor.offsetInBlock - memdbVlogHdrSize + block := l.blocks[cursor.blocks-1].buf + var hdr memdbVlogHdr + hdr.load(block[hdrOff:]) + node := db.getNode(hdr.nodeAddr) + + node.vptr = hdr.oldValue + db.size -= int(hdr.valueLen) + // oldValue.isNull() == true means this is a newly added value. + if hdr.oldValue.isNull() { + db.count-- + db.size -= int(node.klen) + // If there are no flags associated with this key, we need to delete this node. + keptFlags := node.getKeyFlags() & persistentFlags + if keptFlags == 0 { + db.deleteNode(node) + } else { + node.setKeyFlags(keptFlags) + db.dirty = true + } + } else { + db.size += len(l.getValue(hdr.oldValue)) + } + + l.moveBackCursor(&cursor, &hdr) + } } func (l *memdbVlog) inspectKVInLog(db *memdb, head, tail *memdbCheckpoint, f func(Key, NewKeyFlags, []byte)) { - panic("todo") + cursor := *tail + for !head.isSamePosition(&cursor) { + cursorAddr := memdbArenaAddr{idx: uint32(cursor.blocks - 1), off: uint32(cursor.offsetInBlock)} + hdrOff := cursorAddr.off - memdbVlogHdrSize + block := l.blocks[cursorAddr.idx].buf + var hdr memdbVlogHdr + hdr.load(block[hdrOff:]) + node := db.allocator.getNode(hdr.nodeAddr) + + // Skip older versions. + if node.vptr == cursorAddr { + value := block[hdrOff-hdr.valueLen : hdrOff] + f(node.getKey(), node.getKeyFlags(), value) + } + + l.moveBackCursor(&cursor, &hdr) + } +} + +func (l *memdbVlog) moveBackCursor(cursor *memdbCheckpoint, hdr *memdbVlogHdr) { + cursor.offsetInBlock -= (memdbVlogHdrSize + int(hdr.valueLen)) + if cursor.offsetInBlock == 0 { + cursor.blocks-- + if cursor.blocks > 0 { + cursor.offsetInBlock = l.blocks[cursor.blocks-1].length + } + } } func (l *memdbVlog) canModify(cp *memdbCheckpoint, addr memdbArenaAddr) bool { - panic("todo") + if cp == nil { + return true + } + if int(addr.idx) > cp.blocks-1 { + return true + } + if int(addr.idx) == cp.blocks-1 && int(addr.off) > cp.offsetInBlock { + return true + } + return false } diff --git a/kv/new_memdb_iterator.go b/kv/new_memdb_iterator.go new file mode 100644 index 0000000000..c0246d0978 --- /dev/null +++ b/kv/new_memdb_iterator.go @@ -0,0 +1,162 @@ +// Copyright 2020 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// See the License for the specific language governing permissions and +// limitations under the License. + +package kv + +import "bytes" + +type memdbIterator struct { + db *memdb + curr memdbNodeAddr + start Key + end Key + reverse bool + includeFlags bool +} + +func (db *memdb) Iter(k Key, upperBound Key) (Iterator, error) { + i := &memdbIterator{ + db: db, + start: k, + end: upperBound, + } + i.init() + return i, nil +} + +func (db *memdb) IterReverse(k Key) (Iterator, error) { + i := &memdbIterator{ + db: db, + end: k, + reverse: true, + } + i.init() + return i, nil +} + +func (i *memdbIterator) init() { + if i.reverse { + if len(i.end) == 0 { + i.seekToLast() + } else { + i.seek(i.end) + } + } else { + if len(i.start) == 0 { + i.seekToFirst() + } else { + i.seek(i.start) + } + } + + if i.isFlagsOnly() && !i.includeFlags { + err := i.Next() + _ = err // memdbIterator will never fail + } +} + +func (i *memdbIterator) Valid() bool { + if !i.reverse { + return !i.curr.isNull() && (i.end == nil || bytes.Compare(i.Key(), i.end) < 0) + } + return !i.curr.isNull() +} + +func (i *memdbIterator) Key() Key { + return i.curr.getKey() +} + +func (i *memdbIterator) Value() []byte { + return i.db.vlog.getValue(i.curr.vptr) +} + +func (i *memdbIterator) Next() error { + for { + if i.reverse { + i.curr = i.db.predecessor(i.curr) + } else { + i.curr = i.db.successor(i.curr) + } + + // We need to skip persistent flags only nodes. + if i.includeFlags || !i.isFlagsOnly() { + break + } + } + return nil +} + +func (i *memdbIterator) Close() {} + +func (i *memdbIterator) seekToFirst() { + y := memdbNodeAddr{nil, nullAddr} + x := i.db.getNode(i.db.root) + + for !x.isNull() { + y = x + x = y.getLeft(i.db) + } + + i.curr = y +} + +func (i *memdbIterator) seekToLast() { + y := memdbNodeAddr{nil, nullAddr} + x := i.db.getNode(i.db.root) + + for !x.isNull() { + y = x + x = y.getRight(i.db) + } + + i.curr = y +} + +func (i *memdbIterator) seek(key Key) { + y := memdbNodeAddr{nil, nullAddr} + x := i.db.getNode(i.db.root) + + var cmp int + for !x.isNull() { + y = x + cmp = bytes.Compare(key, y.getKey()) + + if cmp < 0 { + x = y.getLeft(i.db) + } else if cmp > 0 { + x = y.getRight(i.db) + } else { + break + } + } + + if !i.reverse { + if cmp > 0 { + // Move to next + i.curr = i.db.successor(y) + return + } + i.curr = y + return + } + + if cmp <= 0 && !y.isNull() { + i.curr = i.db.predecessor(y) + return + } + i.curr = y +} + +func (i *memdbIterator) isFlagsOnly() bool { + return !i.curr.isNull() && i.curr.vptr.isNull() +}