compactindex: optimize

This commit is contained in:
Richard Patel 2022-11-14 04:35:32 +01:00
parent a5fada037d
commit 2bbdb6612a
4 changed files with 91 additions and 44 deletions

View File

@ -13,11 +13,15 @@ import (
"os"
"path/filepath"
"sort"
"sync/atomic"
"golang.org/x/sync/errgroup"
)
// Builder creates new compactindex files.
type Builder struct {
Header
Workers int
buckets []tempBucket
dir string
}
@ -59,6 +63,7 @@ func NewBuilder(dir string, numItems uint, targetFileSize uint64) (*Builder, err
FileSize: targetFileSize,
NumBuckets: uint32(numBuckets),
},
Workers: 1,
buckets: buckets,
dir: dir,
}, nil
@ -87,23 +92,49 @@ func (b *Builder) Seal(ctx context.Context, f *os.File) (err error) {
if err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
// Create hole to leave space for bucket header table.
// List offsets.
bucketTableLen := int64(b.NumBuckets) * bucketHdrLen
err = fallocate(f, headerSize, bucketTableLen)
if err != nil {
return fmt.Errorf("failed to fallocate() bucket table: %w", err)
bucketOffsets := make([]int64, b.NumBuckets)
offset := headerSize + bucketTableLen
for i, bucket := range b.buckets {
bucketOffsets[i] = offset
offset += int64(bucket.records) * (3 + int64(intWidth(b.FileSize))) // TODO hardcoded
}
// Pre-allocate file.
if err := fallocate(f, headerSize, offset-headerSize); err != nil {
return fmt.Errorf("failed to pre-allocate file: %w", err)
}
// Seal each bucket.
for i := range b.buckets {
if err := b.sealBucket(ctx, i, f); err != nil {
return err
}
var bucketIdx atomic.Uint32
group, ctx := errgroup.WithContext(context.Background())
for w := 0; w < b.Workers; w++ {
group.Go(func() error {
subFile, err := os.OpenFile(f.Name(), os.O_WRONLY, 0666)
if err != nil {
return err
}
defer subFile.Close()
for {
select {
case <-ctx.Done():
return nil
default:
idx := bucketIdx.Add(1) - 1
if idx >= b.NumBuckets {
return nil
}
if err := b.sealBucket(ctx, int(idx), subFile, bucketOffsets[idx]); err != nil {
return err
}
}
}
})
}
return nil
return group.Wait()
}
// sealBucket will mine a bucket hashtable, write entries to a file, a
func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error {
func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File, at int64) error {
// Produce perfect hash table for bucket.
bucket := &b.buckets[i]
if err := bucket.flush(); err != nil {
@ -114,21 +145,17 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error {
if err != nil {
return fmt.Errorf("failed to mine bucket %d: %w", i, err)
}
// Find current file length.
offset, err := f.Seek(0, io.SeekEnd)
if err != nil {
// Seek to entries location.
if _, err = f.Seek(at, io.SeekStart); err != nil {
return fmt.Errorf("failed to seek to EOF: %w", err)
}
if offset < 0 {
panic("os.File.Seek() < 0")
}
// Write header to file.
desc := BucketDescriptor{
BucketHeader: BucketHeader{
HashDomain: domain,
NumEntries: uint32(bucket.records),
HashLen: 3, // TODO remove hardcoded constant
FileOffset: uint64(offset),
FileOffset: uint64(at),
},
Stride: 3 + intWidth(b.FileSize), // TODO remove hardcoded constant
OffsetWidth: intWidth(b.FileSize),
@ -167,10 +194,15 @@ type tempBucket struct {
// writeTuple performs a buffered write of a KV-tuple.
func (b *tempBucket) writeTuple(key []byte, value uint64) (err error) {
b.records++
var static [10]byte
binary.LittleEndian.PutUint16(static[0:2], uint16(len(key)))
binary.LittleEndian.PutUint64(static[2:10], value)
if _, err = b.writer.Write(static[:]); err != nil {
if b.writer.Available() < 10 {
if err := b.writer.Flush(); err != nil {
return err
}
}
buf := b.writer.AvailableBuffer()[:10]
binary.LittleEndian.PutUint16(buf[0:2], uint16(len(key)))
binary.LittleEndian.PutUint64(buf[2:10], value)
if _, err := b.writer.Write(buf); err != nil {
return err
}
_, err = b.writer.Write(key)
@ -194,7 +226,16 @@ func (b *tempBucket) mine(ctx context.Context, attempts uint32) (entries []Entry
entries = make([]Entry, b.records)
bitmap := make([]byte, 1<<21)
rd := bufio.NewReader(b.file)
// Read entire file into memory.
info, err := b.file.Stat()
if err != nil {
return nil, 0, fmt.Errorf("failed to stat temp file: %w", err)
}
buf := make([]byte, info.Size())
if _, err := io.ReadFull(io.NewSectionReader(b.file, 0, info.Size()), buf); err != nil {
return nil, 0, fmt.Errorf("failed to read file into memory: %w", err)
}
for domain = uint32(0); domain < attempts; domain++ {
if err = ctx.Err(); err != nil {
return
@ -203,18 +244,12 @@ func (b *tempBucket) mine(ctx context.Context, attempts uint32) (entries []Entry
for i := range bitmap {
bitmap[i] = 0
}
// Reset reader
if _, err = b.file.Seek(0, io.SeekStart); err != nil {
return
}
rd.Reset(b.file)
if hashErr := hashBucket(rd, entries, bitmap, domain); errors.Is(hashErr, ErrCollision) {
if hashErr := hashBucket(buf, entries, bitmap, domain); errors.Is(hashErr, ErrCollision) {
continue
} else if hashErr != nil {
return nil, 0, hashErr
}
return // ok
}
@ -224,26 +259,26 @@ func (b *tempBucket) mine(ctx context.Context, attempts uint32) (entries []Entry
// hashBucket reads and hashes entries from a temporary bucket file.
//
// Uses a 2^24 wide bitmap to detect collisions.
func hashBucket(rd *bufio.Reader, entries []Entry, bitmap []byte, nonce uint32) error {
func hashBucket(rd []byte, entries []Entry, bitmap []byte, nonce uint32) error {
// TODO Don't hardcode this, choose hash depth dynamically
mask := uint64(0xffffff)
// Scan provided reader for entries and hash along the way.
for i := range entries {
// Read next key from file (as defined by writeTuple)
var static [10]byte
if _, err := io.ReadFull(rd, static[:]); err != nil {
return err
}
keyLen := binary.LittleEndian.Uint16(static[0:2])
value := binary.LittleEndian.Uint64(static[2:10])
key := make([]byte, keyLen)
if _, err := io.ReadFull(rd, key); err != nil {
return err
if len(rd) < 10 {
return io.ErrUnexpectedEOF
}
keyLen := binary.LittleEndian.Uint16(rd[0:2])
value := binary.LittleEndian.Uint64(rd[2:10])
rd = rd[10:]
// Hash to entry
hash := EntryHash64(nonce, key) & mask
if len(rd) < int(keyLen) {
return io.ErrUnexpectedEOF
}
hash := EntryHash64(nonce, rd[:keyLen]) & mask
rd = rd[keyLen:]
// Check for collision in bitmap
bi, bj := hash/8, hash%8

View File

@ -9,6 +9,7 @@ import (
"math"
"math/rand"
"os"
"runtime"
"testing"
"time"
@ -179,7 +180,7 @@ func TestBuilder_Random(t *testing.T) {
t.Skip("Skipping long test")
}
const numKeys = uint(500000)
const numKeys = uint(10000000)
const keySize = uint(16)
const maxOffset = uint64(1000000)
const queries = int(10000)
@ -189,6 +190,7 @@ func TestBuilder_Random(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, builder)
require.NotEmpty(t, builder.buckets)
builder.Workers = runtime.NumCPU()
// Ensure we cleaned up after ourselves.
defer func() {

View File

@ -226,6 +226,16 @@ func EntryHash64(prefix uint32, key []byte) uint64 {
return digest.Sum64()
}
func EntryHasher(prefix uint32) (digest xxhash.Digest) {
const blockSize = 32
var prefixBlock [blockSize]byte
binary.LittleEndian.PutUint32(prefixBlock[:4], prefix)
digest.Reset()
digest.Write(prefixBlock[:])
return
}
// Entry is a single element in a hash table.
type Entry struct {
Hash uint64

View File

@ -155,9 +155,10 @@ func (b *Bucket) Lookup(key []byte) (uint64, error) {
func (b *Bucket) binarySearch(target uint64) (uint64, error) {
low := 0
high := int(b.NumEntries)
buf := make([]byte, b.Stride)
for low <= high {
median := (low + high) / 2
entry, err := b.loadEntry(median)
entry, err := b.loadEntry(median, buf)
if err != nil {
return 0, err
}
@ -172,9 +173,8 @@ func (b *Bucket) binarySearch(target uint64) (uint64, error) {
return 0, ErrNotFound
}
func (b *Bucket) loadEntry(i int) (Entry, error) {
func (b *Bucket) loadEntry(i int, buf []byte) (Entry, error) {
off := int64(i) * int64(b.Stride)
buf := make([]byte, b.Stride)
n, err := b.Entries.ReadAt(buf, off)
if n != len(buf) {
return Entry{}, err