compactindex: simplify header, fix queries

This commit is contained in:
Richard Patel 2022-11-13 22:37:58 +01:00
parent 67404488a1
commit dd5fa9a000
5 changed files with 236 additions and 60 deletions

View File

@ -9,6 +9,9 @@ go_library(
"@platforms//os:osx": ["build.go"],
"@platforms//os:linux": ["build.go"],
"//conditions:default": [],
}) + select({
"@platforms//os:linux": ["fallocate_linux.go"],
"//conditions:default": ["fallocate_generic.go"],
}),
importpath = "go.firedancer.io/radiance/pkg/compactindex",
visibility = ["//visibility:public"],
@ -22,10 +25,14 @@ go_test(
srcs = [
"compactindex_test.go",
"query_test.go",
],
] + select({
"@platforms//os:osx": ["build_test.go"],
"@platforms//os:linux": ["build_test.go"],
"//conditions:default": [],
}),
embed = [":compactindex"],
deps = [
"@com_github_stretchr_testify//assert",
"@com_github_stretchr_testify//require",
]
],
)

View File

@ -87,8 +87,8 @@ func (b *Builder) Seal(ctx context.Context, f *os.File) (err error) {
if err != nil {
return fmt.Errorf("failed to write header: %w", err)
}
// Create hole to leave space for bucket offset table.
bucketTableLen := int64(b.NumBuckets) * bucketOffsetSize
// Create hole to leave space for bucket header table.
bucketTableLen := int64(b.NumBuckets) * bucketHdrLen
err = fallocate(f, headerSize, bucketTableLen)
if err != nil {
return fmt.Errorf("failed to fallocate() bucket table: %w", err)
@ -114,10 +114,13 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error {
if err != nil {
return fmt.Errorf("failed to mine bucket %d: %w", i, err)
}
// Seek to end of file.
bucketOffset, err := f.Seek(0, io.SeekEnd)
// Find current file length.
offset, err := f.Seek(0, io.SeekEnd)
if err != nil {
return fmt.Errorf("failed to seek to index EOF: %w", err)
return fmt.Errorf("failed to seek to EOF: %w", err)
}
if offset < 0 {
panic("os.File.Seek() < 0")
}
// Write header to file.
desc := BucketDescriptor{
@ -125,15 +128,13 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error {
HashDomain: domain,
NumEntries: uint32(bucket.records),
HashLen: 3, // TODO remove hardcoded constant
FileOffset: uint64(offset),
},
}
wr := bufio.NewWriter(f)
var headerBuf [bucketHeaderSize]byte
desc.BucketHeader.Store(&headerBuf)
if _, err := wr.Write(headerBuf[:]); err != nil {
return fmt.Errorf("failed to write bucket header: %w", err)
Stride: 3 + intWidth(b.FileSize), // TODO remove hardcoded constant
OffsetWidth: intWidth(b.FileSize),
}
// Write entries to file.
wr := bufio.NewWriter(f)
entryBuf := make([]byte, desc.HashLen+intWidth(b.FileSize)) // TODO remove hardcoded constant
for _, entry := range entries {
desc.storeEntry(entryBuf, entry)
@ -144,13 +145,9 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error {
if err := wr.Flush(); err != nil {
return fmt.Errorf("failed to flush bucket to index: %w", err)
}
// Write offset to header.
var bucketOffsetBuf [8]byte
binary.LittleEndian.PutUint64(bucketOffsetBuf[:], uint64(bucketOffset))
pointerOffset := headerSize + (int64(i) * bucketOffsetSize)
_, err = f.WriteAt(bucketOffsetBuf[:bucketOffsetSize], pointerOffset)
if err != nil {
return fmt.Errorf("failed to write bucket offset to header (at %#x => %#x)", pointerOffset, bucketOffset)
// Write header to file.
if err := desc.BucketHeader.writeTo(f, uint(i)); err != nil {
return fmt.Errorf("failed to write bucket header %d: %w", i, err)
}
return nil
}
@ -233,13 +230,13 @@ func hashBucket(rd *bufio.Reader, entries []Entry, bitmap []byte, nonce uint32)
// Scan provided reader for entries and hash along the way.
for i := range entries {
// Read next key from file
// Read next key from file (as defined by writeTuple)
var static [10]byte
if _, err := io.ReadFull(rd, static[:]); err != nil {
return err
}
keyLen := binary.LittleEndian.Uint16(static[0:2])
value := binary.LittleEndian.Uint64(static[0:10])
value := binary.LittleEndian.Uint64(static[2:10])
key := make([]byte, keyLen)
if _, err := io.ReadFull(rd, key); err != nil {
return err
@ -258,8 +255,8 @@ func hashBucket(rd *bufio.Reader, entries []Entry, bitmap []byte, nonce uint32)
// Export entry
entries[i] = Entry{
Hash: hash,
Offset: value,
Hash: hash,
Value: value,
}
}

View File

@ -6,6 +6,7 @@ import (
"errors"
"io"
"io/fs"
"math"
"math/rand"
"os"
"testing"
@ -16,14 +17,176 @@ import (
)
func TestBuilder(t *testing.T) {
const numKeys = uint(1000000)
const numBuckets = 3
const maxValue = math.MaxUint64
// Create a table with 3 buckets.
builder, err := NewBuilder("", numBuckets*targetEntriesPerBucket, maxValue)
require.NoError(t, err)
require.NotNil(t, builder)
assert.Len(t, builder.buckets, 3)
defer builder.Close()
// Insert a few entries.
require.NoError(t, builder.Insert([]byte("hello"), 1))
require.NoError(t, builder.Insert([]byte("world"), 2))
require.NoError(t, builder.Insert([]byte("blub"), 3))
// Create index file.
targetFile, err := os.CreateTemp("", "compactindex-final-")
require.NoError(t, err)
defer os.Remove(targetFile.Name())
defer targetFile.Close()
// Seal index.
require.NoError(t, builder.Seal(context.TODO(), targetFile))
// Assert binary content.
buf, err := os.ReadFile(targetFile.Name())
require.NoError(t, err)
assert.Equal(t, []byte{
// --- File header
// magic
0x72, 0x64, 0x63, 0x65, 0x63, 0x69, 0x64, 0x78,
// max file size
0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff,
// num buckets
0x03, 0x00, 0x00, 0x00,
// padding
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// --- Bucket header 0
// hash domain
0x00, 0x00, 0x00, 0x00,
// num entries
0x01, 0x00, 0x00, 0x00,
// hash len
0x03,
// padding
0x00,
// file offset
0x50, 0x00, 0x00, 0x00, 0x00, 0x00,
// --- Bucket header 1
// hash domain
0x00, 0x00, 0x00, 0x00,
// num entries
0x00, 0x00, 0x00, 0x00,
// hash len
0x03,
// padding
0x00,
// file offset
0x5b, 0x00, 0x00, 0x00, 0x00, 0x00,
// --- Bucket header 2
// hash domain
0x00, 0x00, 0x00, 0x00,
// num entries
0x02, 0x00, 0x00, 0x00,
// hash len
0x03,
// padding
0x00,
// file offset
0x5b, 0x00, 0x00, 0x00, 0x00, 0x00,
// --- Bucket 0
// hash
0xe2, 0xdb, 0x55,
// value
0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// --- Bucket 2
// hash
0xe3, 0x09, 0x6b,
// value
0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
// hash
0x92, 0xcd, 0xbb,
// value
0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
}, buf)
// Reset file offset.
_, seekErr := targetFile.Seek(0, io.SeekStart)
require.NoError(t, seekErr)
// Open index.
db, err := Open(targetFile)
require.NoError(t, err, "Failed to open generated index")
require.NotNil(t, db)
// File header assertions.
assert.Equal(t, Header{
FileSize: maxValue,
NumBuckets: numBuckets,
}, db.Header)
// Get bucket handles.
buckets := make([]*Bucket, numBuckets)
for i := range buckets {
buckets[i], err = db.GetBucket(uint(i))
require.NoError(t, err)
}
// Ensure out-of-bounds bucket accesses fail.
_, wantErr := db.GetBucket(numBuckets)
assert.EqualError(t, wantErr, "out of bounds bucket index: 3 >= 3")
// Bucket header assertions.
assert.Equal(t, BucketDescriptor{
BucketHeader: BucketHeader{
HashDomain: 0x00,
NumEntries: 1,
HashLen: 3,
FileOffset: 0x50,
},
Stride: 11, // 3 + 8
OffsetWidth: 8,
}, buckets[0].BucketDescriptor)
assert.Equal(t, BucketHeader{
HashDomain: 0x00,
NumEntries: 0,
HashLen: 3,
FileOffset: 0x5b,
}, buckets[1].BucketHeader)
assert.Equal(t, BucketHeader{
HashDomain: 0x00,
NumEntries: 2,
HashLen: 3,
FileOffset: 0x5b,
}, buckets[2].BucketHeader)
// Test lookups.
entries, err := buckets[2].Load( /*batchSize*/ 4)
require.NoError(t, err)
assert.Equal(t, []Entry{
{
Hash: 0x6b09e3,
Value: 3,
},
{
Hash: 0xbbcd92,
Value: 2,
},
}, entries)
}
func TestBuilder_Random(t *testing.T) {
if testing.Short() {
t.Skip("Skipping long test")
}
const numKeys = uint(500000)
const keySize = uint(16)
const maxOffset = uint64(1000000)
const queries = int(100000)
const queries = int(1000)
// Create new builder session.
builder, err := NewBuilder("", numKeys, maxOffset)
require.NoError(t, err)
require.NotNil(t, builder)
require.NotEmpty(t, builder.buckets)
// Ensure we cleaned up after ourselves.
@ -47,6 +210,7 @@ func TestBuilder(t *testing.T) {
targetFile, err := os.CreateTemp("", "compactindex-final-")
require.NoError(t, err)
defer os.Remove(targetFile.Name())
defer targetFile.Close()
// Seal to final index.
preSeal := time.Now()

View File

@ -100,9 +100,6 @@ type Header struct {
// headerSize is the size of the header at the beginning of the file.
const headerSize = 32
// bucketOffsetSize is the size of the offsets in the bucket header table.
const bucketOffsetSize = 6
// Load checks the Magic sequence and loads the header fields.
func (h *Header) Load(buf *[headerSize]byte) error {
// Use a magic byte sequence to bail fast when user passes a corrupted/unrelated stream.
@ -152,21 +149,25 @@ type BucketHeader struct {
HashDomain uint32
NumEntries uint32
HashLen uint8
FileOffset uint64
}
// bucketHeaderSize is the size of the header preceding the hash table entries.
const bucketHeaderSize = 12
// bucketHdrLen is the size of the header preceding the hash table entries.
const bucketHdrLen = 16
func (b *BucketHeader) Store(buf *[bucketHeaderSize]byte) {
func (b *BucketHeader) Store(buf *[bucketHdrLen]byte) {
binary.LittleEndian.PutUint32(buf[0:4], b.HashDomain)
binary.LittleEndian.PutUint32(buf[4:8], b.NumEntries)
buf[8] = b.HashLen
buf[9] = 0
putUintLe(buf[10:16], b.FileOffset)
}
func (b *BucketHeader) Load(buf *[bucketHeaderSize]byte) {
func (b *BucketHeader) Load(buf *[bucketHdrLen]byte) {
b.HashDomain = binary.LittleEndian.Uint32(buf[0:4])
b.NumEntries = binary.LittleEndian.Uint32(buf[4:8])
b.HashLen = buf[8]
b.FileOffset = uintLe(buf[10:16])
}
// Hash returns the per-bucket hash of a key.
@ -184,7 +185,7 @@ type BucketDescriptor struct {
func (b *BucketDescriptor) loadEntry(buf []byte) (e Entry) {
e.Hash = uintLe(buf[0:b.HashLen])
e.Offset = uintLe(buf[b.HashLen : b.HashLen+b.OffsetWidth])
e.Value = uintLe(buf[b.HashLen : b.HashLen+b.OffsetWidth])
return
}
@ -193,14 +194,19 @@ func (b *BucketDescriptor) storeEntry(buf []byte, e Entry) {
panic("serializeEntry: buf too small")
}
putUintLe(buf[0:b.HashLen], e.Hash)
putUintLe(buf[b.HashLen:b.HashLen+b.OffsetWidth], e.Offset)
putUintLe(buf[b.HashLen:b.HashLen+b.OffsetWidth], e.Value)
}
// SearchSortedEntries performs an in-memory binary search for a given hash.
func SearchSortedEntries(entries []Entry, hash uint64) *Entry {
i := sort.Search(len(entries), func(i int) bool {
return entries[i].Hash > hash
i, found := sort.Find(len(entries), func(i int) int {
other := entries[i].Hash
// Note: This is safe because neither side exceeds 2^24.
return int(hash) - int(other)
})
if !found {
return nil
}
if i >= len(entries) || entries[i].Hash != hash {
return nil
}
@ -222,8 +228,8 @@ func EntryHash64(prefix uint32, key []byte) uint64 {
// Entry is a single element in a hash table.
type Entry struct {
Hash uint64
Offset uint64
Hash uint64
Value uint64
}
// intWidth returns the number of bytes minimally required to represent the given integer.

View File

@ -1,7 +1,6 @@
package compactindex
import (
"encoding/binary"
"errors"
"fmt"
"io"
@ -41,18 +40,10 @@ func (db *DB) FindBucket(key []byte) (*Bucket, error) {
// GetBucket returns a handle to the bucket at the given index.
func (db *DB) GetBucket(i uint) (*Bucket, error) {
if i > uint(db.Header.NumBuckets) {
if i >= uint(db.Header.NumBuckets) {
return nil, fmt.Errorf("out of bounds bucket index: %d >= %d", i, db.Header.NumBuckets)
}
headerOffset := headerSize + int64(i)*bucketOffsetSize
// Read pointer to bucket header.
var bucketOffsetBuf [8]byte
n, readErr := db.Stream.ReadAt(bucketOffsetBuf[:bucketOffsetSize], headerOffset)
if n < bucketOffsetSize {
return nil, readErr
}
bucketOffset := binary.LittleEndian.Uint64(bucketOffsetBuf[:])
// Fill bucket handle.
bucket := &Bucket{
BucketDescriptor: BucketDescriptor{
@ -61,11 +52,11 @@ func (db *DB) GetBucket(i uint) (*Bucket, error) {
},
}
// Read bucket header.
bucket.BucketHeader, readErr = readBucketHeader(db.Stream, int64(bucketOffset))
readErr := bucket.BucketHeader.readFrom(db.Stream, i)
if readErr != nil {
return nil, readErr
}
bucket.Entries = io.NewSectionReader(db.Stream, int64(bucketOffset)+bucketHeaderSize, int64(bucket.NumEntries)*int64(bucket.Stride))
bucket.Entries = io.NewSectionReader(db.Stream, int64(bucket.FileOffset), int64(bucket.NumEntries)*int64(bucket.Stride))
return bucket, nil
}
@ -75,15 +66,25 @@ func (db *DB) entryStride() uint8 {
return uint8(hashSize) + offsetSize
}
func readBucketHeader(rd io.ReaderAt, at int64) (hdr BucketHeader, err error) {
var buf [bucketHeaderSize]byte
var n int
n, err = rd.ReadAt(buf[:], at)
if n >= len(buf) {
err = nil
func bucketOffset(i uint) int64 {
return headerSize + int64(i)*bucketHdrLen
}
func (b *BucketHeader) readFrom(rd io.ReaderAt, i uint) error {
var buf [bucketHdrLen]byte
n, err := rd.ReadAt(buf[:], bucketOffset(i))
if n < len(buf) {
return err
}
hdr.Load(&buf)
return
b.Load(&buf)
return nil
}
func (b *BucketHeader) writeTo(wr io.WriterAt, i uint) error {
var buf [bucketHdrLen]byte
b.Store(&buf)
_, err := wr.WriteAt(buf[:], bucketOffset(i))
return err
}
// Bucket is a database handle pointing to a subset of the index.
@ -95,6 +96,7 @@ type Bucket struct {
// maxEntriesPerBucket is the hardcoded maximum permitted number of entries per bucket.
const maxEntriesPerBucket = 1 << 24 // (16 * stride) MiB
// targetEntriesPerBucket is the average number of records in each hashtable bucket we aim for.
const targetEntriesPerBucket = 10000
// Load retrieves all entries in the hashtable.
@ -116,7 +118,7 @@ func (b *Bucket) Load(batchSize int) ([]Entry, error) {
n, err := b.Entries.ReadAt(buf, off)
// Decode all entries in it.
sub := buf[:n]
for len(sub) > stride {
for len(sub) >= stride {
entries = append(entries, b.loadEntry(sub))
sub = sub[stride:]
off += int64(stride)