diff --git a/pkg/compactindex/BUILD b/pkg/compactindex/BUILD index 915ddc0..dda641b 100644 --- a/pkg/compactindex/BUILD +++ b/pkg/compactindex/BUILD @@ -9,6 +9,9 @@ go_library( "@platforms//os:osx": ["build.go"], "@platforms//os:linux": ["build.go"], "//conditions:default": [], + }) + select({ + "@platforms//os:linux": ["fallocate_linux.go"], + "//conditions:default": ["fallocate_generic.go"], }), importpath = "go.firedancer.io/radiance/pkg/compactindex", visibility = ["//visibility:public"], @@ -22,10 +25,14 @@ go_test( srcs = [ "compactindex_test.go", "query_test.go", - ], + ] + select({ + "@platforms//os:osx": ["build_test.go"], + "@platforms//os:linux": ["build_test.go"], + "//conditions:default": [], + }), embed = [":compactindex"], deps = [ "@com_github_stretchr_testify//assert", "@com_github_stretchr_testify//require", - ] + ], ) diff --git a/pkg/compactindex/build.go b/pkg/compactindex/build.go index f7b5a30..5e65a54 100644 --- a/pkg/compactindex/build.go +++ b/pkg/compactindex/build.go @@ -87,8 +87,8 @@ func (b *Builder) Seal(ctx context.Context, f *os.File) (err error) { if err != nil { return fmt.Errorf("failed to write header: %w", err) } - // Create hole to leave space for bucket offset table. - bucketTableLen := int64(b.NumBuckets) * bucketOffsetSize + // Create hole to leave space for bucket header table. + bucketTableLen := int64(b.NumBuckets) * bucketHdrLen err = fallocate(f, headerSize, bucketTableLen) if err != nil { return fmt.Errorf("failed to fallocate() bucket table: %w", err) @@ -114,10 +114,13 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error { if err != nil { return fmt.Errorf("failed to mine bucket %d: %w", i, err) } - // Seek to end of file. - bucketOffset, err := f.Seek(0, io.SeekEnd) + // Find current file length. + offset, err := f.Seek(0, io.SeekEnd) if err != nil { - return fmt.Errorf("failed to seek to index EOF: %w", err) + return fmt.Errorf("failed to seek to EOF: %w", err) + } + if offset < 0 { + panic("os.File.Seek() < 0") } // Write header to file. desc := BucketDescriptor{ @@ -125,15 +128,13 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error { HashDomain: domain, NumEntries: uint32(bucket.records), HashLen: 3, // TODO remove hardcoded constant + FileOffset: uint64(offset), }, - } - wr := bufio.NewWriter(f) - var headerBuf [bucketHeaderSize]byte - desc.BucketHeader.Store(&headerBuf) - if _, err := wr.Write(headerBuf[:]); err != nil { - return fmt.Errorf("failed to write bucket header: %w", err) + Stride: 3 + intWidth(b.FileSize), // TODO remove hardcoded constant + OffsetWidth: intWidth(b.FileSize), } // Write entries to file. + wr := bufio.NewWriter(f) entryBuf := make([]byte, desc.HashLen+intWidth(b.FileSize)) // TODO remove hardcoded constant for _, entry := range entries { desc.storeEntry(entryBuf, entry) @@ -144,13 +145,9 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error { if err := wr.Flush(); err != nil { return fmt.Errorf("failed to flush bucket to index: %w", err) } - // Write offset to header. - var bucketOffsetBuf [8]byte - binary.LittleEndian.PutUint64(bucketOffsetBuf[:], uint64(bucketOffset)) - pointerOffset := headerSize + (int64(i) * bucketOffsetSize) - _, err = f.WriteAt(bucketOffsetBuf[:bucketOffsetSize], pointerOffset) - if err != nil { - return fmt.Errorf("failed to write bucket offset to header (at %#x => %#x)", pointerOffset, bucketOffset) + // Write header to file. + if err := desc.BucketHeader.writeTo(f, uint(i)); err != nil { + return fmt.Errorf("failed to write bucket header %d: %w", i, err) } return nil } @@ -233,13 +230,13 @@ func hashBucket(rd *bufio.Reader, entries []Entry, bitmap []byte, nonce uint32) // Scan provided reader for entries and hash along the way. for i := range entries { - // Read next key from file + // Read next key from file (as defined by writeTuple) var static [10]byte if _, err := io.ReadFull(rd, static[:]); err != nil { return err } keyLen := binary.LittleEndian.Uint16(static[0:2]) - value := binary.LittleEndian.Uint64(static[0:10]) + value := binary.LittleEndian.Uint64(static[2:10]) key := make([]byte, keyLen) if _, err := io.ReadFull(rd, key); err != nil { return err @@ -258,8 +255,8 @@ func hashBucket(rd *bufio.Reader, entries []Entry, bitmap []byte, nonce uint32) // Export entry entries[i] = Entry{ - Hash: hash, - Offset: value, + Hash: hash, + Value: value, } } diff --git a/pkg/compactindex/build_test.go b/pkg/compactindex/build_test.go index e68da6a..7bc0d3b 100644 --- a/pkg/compactindex/build_test.go +++ b/pkg/compactindex/build_test.go @@ -6,6 +6,7 @@ import ( "errors" "io" "io/fs" + "math" "math/rand" "os" "testing" @@ -16,14 +17,176 @@ import ( ) func TestBuilder(t *testing.T) { - const numKeys = uint(1000000) + const numBuckets = 3 + const maxValue = math.MaxUint64 + + // Create a table with 3 buckets. + builder, err := NewBuilder("", numBuckets*targetEntriesPerBucket, maxValue) + require.NoError(t, err) + require.NotNil(t, builder) + assert.Len(t, builder.buckets, 3) + defer builder.Close() + + // Insert a few entries. + require.NoError(t, builder.Insert([]byte("hello"), 1)) + require.NoError(t, builder.Insert([]byte("world"), 2)) + require.NoError(t, builder.Insert([]byte("blub"), 3)) + + // Create index file. + targetFile, err := os.CreateTemp("", "compactindex-final-") + require.NoError(t, err) + defer os.Remove(targetFile.Name()) + defer targetFile.Close() + + // Seal index. + require.NoError(t, builder.Seal(context.TODO(), targetFile)) + + // Assert binary content. + buf, err := os.ReadFile(targetFile.Name()) + require.NoError(t, err) + assert.Equal(t, []byte{ + // --- File header + // magic + 0x72, 0x64, 0x63, 0x65, 0x63, 0x69, 0x64, 0x78, + // max file size + 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, 0xff, + // num buckets + 0x03, 0x00, 0x00, 0x00, + // padding + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + + // --- Bucket header 0 + // hash domain + 0x00, 0x00, 0x00, 0x00, + // num entries + 0x01, 0x00, 0x00, 0x00, + // hash len + 0x03, + // padding + 0x00, + // file offset + 0x50, 0x00, 0x00, 0x00, 0x00, 0x00, + + // --- Bucket header 1 + // hash domain + 0x00, 0x00, 0x00, 0x00, + // num entries + 0x00, 0x00, 0x00, 0x00, + // hash len + 0x03, + // padding + 0x00, + // file offset + 0x5b, 0x00, 0x00, 0x00, 0x00, 0x00, + + // --- Bucket header 2 + // hash domain + 0x00, 0x00, 0x00, 0x00, + // num entries + 0x02, 0x00, 0x00, 0x00, + // hash len + 0x03, + // padding + 0x00, + // file offset + 0x5b, 0x00, 0x00, 0x00, 0x00, 0x00, + + // --- Bucket 0 + // hash + 0xe2, 0xdb, 0x55, + // value + 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + + // --- Bucket 2 + // hash + 0xe3, 0x09, 0x6b, + // value + 0x03, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + // hash + 0x92, 0xcd, 0xbb, + // value + 0x02, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, + }, buf) + + // Reset file offset. + _, seekErr := targetFile.Seek(0, io.SeekStart) + require.NoError(t, seekErr) + + // Open index. + db, err := Open(targetFile) + require.NoError(t, err, "Failed to open generated index") + require.NotNil(t, db) + + // File header assertions. + assert.Equal(t, Header{ + FileSize: maxValue, + NumBuckets: numBuckets, + }, db.Header) + + // Get bucket handles. + buckets := make([]*Bucket, numBuckets) + for i := range buckets { + buckets[i], err = db.GetBucket(uint(i)) + require.NoError(t, err) + } + + // Ensure out-of-bounds bucket accesses fail. + _, wantErr := db.GetBucket(numBuckets) + assert.EqualError(t, wantErr, "out of bounds bucket index: 3 >= 3") + + // Bucket header assertions. + assert.Equal(t, BucketDescriptor{ + BucketHeader: BucketHeader{ + HashDomain: 0x00, + NumEntries: 1, + HashLen: 3, + FileOffset: 0x50, + }, + Stride: 11, // 3 + 8 + OffsetWidth: 8, + }, buckets[0].BucketDescriptor) + assert.Equal(t, BucketHeader{ + HashDomain: 0x00, + NumEntries: 0, + HashLen: 3, + FileOffset: 0x5b, + }, buckets[1].BucketHeader) + assert.Equal(t, BucketHeader{ + HashDomain: 0x00, + NumEntries: 2, + HashLen: 3, + FileOffset: 0x5b, + }, buckets[2].BucketHeader) + + // Test lookups. + entries, err := buckets[2].Load( /*batchSize*/ 4) + require.NoError(t, err) + assert.Equal(t, []Entry{ + { + Hash: 0x6b09e3, + Value: 3, + }, + { + Hash: 0xbbcd92, + Value: 2, + }, + }, entries) +} + +func TestBuilder_Random(t *testing.T) { + if testing.Short() { + t.Skip("Skipping long test") + } + + const numKeys = uint(500000) const keySize = uint(16) const maxOffset = uint64(1000000) - const queries = int(100000) + const queries = int(1000) // Create new builder session. builder, err := NewBuilder("", numKeys, maxOffset) require.NoError(t, err) + require.NotNil(t, builder) require.NotEmpty(t, builder.buckets) // Ensure we cleaned up after ourselves. @@ -47,6 +210,7 @@ func TestBuilder(t *testing.T) { targetFile, err := os.CreateTemp("", "compactindex-final-") require.NoError(t, err) defer os.Remove(targetFile.Name()) + defer targetFile.Close() // Seal to final index. preSeal := time.Now() diff --git a/pkg/compactindex/compactindex.go b/pkg/compactindex/compactindex.go index 48bccd9..7d0bbb6 100644 --- a/pkg/compactindex/compactindex.go +++ b/pkg/compactindex/compactindex.go @@ -100,9 +100,6 @@ type Header struct { // headerSize is the size of the header at the beginning of the file. const headerSize = 32 -// bucketOffsetSize is the size of the offsets in the bucket header table. -const bucketOffsetSize = 6 - // Load checks the Magic sequence and loads the header fields. func (h *Header) Load(buf *[headerSize]byte) error { // Use a magic byte sequence to bail fast when user passes a corrupted/unrelated stream. @@ -152,21 +149,25 @@ type BucketHeader struct { HashDomain uint32 NumEntries uint32 HashLen uint8 + FileOffset uint64 } -// bucketHeaderSize is the size of the header preceding the hash table entries. -const bucketHeaderSize = 12 +// bucketHdrLen is the size of the header preceding the hash table entries. +const bucketHdrLen = 16 -func (b *BucketHeader) Store(buf *[bucketHeaderSize]byte) { +func (b *BucketHeader) Store(buf *[bucketHdrLen]byte) { binary.LittleEndian.PutUint32(buf[0:4], b.HashDomain) binary.LittleEndian.PutUint32(buf[4:8], b.NumEntries) buf[8] = b.HashLen + buf[9] = 0 + putUintLe(buf[10:16], b.FileOffset) } -func (b *BucketHeader) Load(buf *[bucketHeaderSize]byte) { +func (b *BucketHeader) Load(buf *[bucketHdrLen]byte) { b.HashDomain = binary.LittleEndian.Uint32(buf[0:4]) b.NumEntries = binary.LittleEndian.Uint32(buf[4:8]) b.HashLen = buf[8] + b.FileOffset = uintLe(buf[10:16]) } // Hash returns the per-bucket hash of a key. @@ -184,7 +185,7 @@ type BucketDescriptor struct { func (b *BucketDescriptor) loadEntry(buf []byte) (e Entry) { e.Hash = uintLe(buf[0:b.HashLen]) - e.Offset = uintLe(buf[b.HashLen : b.HashLen+b.OffsetWidth]) + e.Value = uintLe(buf[b.HashLen : b.HashLen+b.OffsetWidth]) return } @@ -193,14 +194,19 @@ func (b *BucketDescriptor) storeEntry(buf []byte, e Entry) { panic("serializeEntry: buf too small") } putUintLe(buf[0:b.HashLen], e.Hash) - putUintLe(buf[b.HashLen:b.HashLen+b.OffsetWidth], e.Offset) + putUintLe(buf[b.HashLen:b.HashLen+b.OffsetWidth], e.Value) } // SearchSortedEntries performs an in-memory binary search for a given hash. func SearchSortedEntries(entries []Entry, hash uint64) *Entry { - i := sort.Search(len(entries), func(i int) bool { - return entries[i].Hash > hash + i, found := sort.Find(len(entries), func(i int) int { + other := entries[i].Hash + // Note: This is safe because neither side exceeds 2^24. + return int(hash) - int(other) }) + if !found { + return nil + } if i >= len(entries) || entries[i].Hash != hash { return nil } @@ -222,8 +228,8 @@ func EntryHash64(prefix uint32, key []byte) uint64 { // Entry is a single element in a hash table. type Entry struct { - Hash uint64 - Offset uint64 + Hash uint64 + Value uint64 } // intWidth returns the number of bytes minimally required to represent the given integer. diff --git a/pkg/compactindex/query.go b/pkg/compactindex/query.go index 8b3c843..a220f75 100644 --- a/pkg/compactindex/query.go +++ b/pkg/compactindex/query.go @@ -1,7 +1,6 @@ package compactindex import ( - "encoding/binary" "errors" "fmt" "io" @@ -41,18 +40,10 @@ func (db *DB) FindBucket(key []byte) (*Bucket, error) { // GetBucket returns a handle to the bucket at the given index. func (db *DB) GetBucket(i uint) (*Bucket, error) { - if i > uint(db.Header.NumBuckets) { + if i >= uint(db.Header.NumBuckets) { return nil, fmt.Errorf("out of bounds bucket index: %d >= %d", i, db.Header.NumBuckets) } - headerOffset := headerSize + int64(i)*bucketOffsetSize - // Read pointer to bucket header. - var bucketOffsetBuf [8]byte - n, readErr := db.Stream.ReadAt(bucketOffsetBuf[:bucketOffsetSize], headerOffset) - if n < bucketOffsetSize { - return nil, readErr - } - bucketOffset := binary.LittleEndian.Uint64(bucketOffsetBuf[:]) // Fill bucket handle. bucket := &Bucket{ BucketDescriptor: BucketDescriptor{ @@ -61,11 +52,11 @@ func (db *DB) GetBucket(i uint) (*Bucket, error) { }, } // Read bucket header. - bucket.BucketHeader, readErr = readBucketHeader(db.Stream, int64(bucketOffset)) + readErr := bucket.BucketHeader.readFrom(db.Stream, i) if readErr != nil { return nil, readErr } - bucket.Entries = io.NewSectionReader(db.Stream, int64(bucketOffset)+bucketHeaderSize, int64(bucket.NumEntries)*int64(bucket.Stride)) + bucket.Entries = io.NewSectionReader(db.Stream, int64(bucket.FileOffset), int64(bucket.NumEntries)*int64(bucket.Stride)) return bucket, nil } @@ -75,15 +66,25 @@ func (db *DB) entryStride() uint8 { return uint8(hashSize) + offsetSize } -func readBucketHeader(rd io.ReaderAt, at int64) (hdr BucketHeader, err error) { - var buf [bucketHeaderSize]byte - var n int - n, err = rd.ReadAt(buf[:], at) - if n >= len(buf) { - err = nil +func bucketOffset(i uint) int64 { + return headerSize + int64(i)*bucketHdrLen +} + +func (b *BucketHeader) readFrom(rd io.ReaderAt, i uint) error { + var buf [bucketHdrLen]byte + n, err := rd.ReadAt(buf[:], bucketOffset(i)) + if n < len(buf) { + return err } - hdr.Load(&buf) - return + b.Load(&buf) + return nil +} + +func (b *BucketHeader) writeTo(wr io.WriterAt, i uint) error { + var buf [bucketHdrLen]byte + b.Store(&buf) + _, err := wr.WriteAt(buf[:], bucketOffset(i)) + return err } // Bucket is a database handle pointing to a subset of the index. @@ -95,6 +96,7 @@ type Bucket struct { // maxEntriesPerBucket is the hardcoded maximum permitted number of entries per bucket. const maxEntriesPerBucket = 1 << 24 // (16 * stride) MiB +// targetEntriesPerBucket is the average number of records in each hashtable bucket we aim for. const targetEntriesPerBucket = 10000 // Load retrieves all entries in the hashtable. @@ -116,7 +118,7 @@ func (b *Bucket) Load(batchSize int) ([]Entry, error) { n, err := b.Entries.ReadAt(buf, off) // Decode all entries in it. sub := buf[:n] - for len(sub) > stride { + for len(sub) >= stride { entries = append(entries, b.loadEntry(sub)) sub = sub[stride:] off += int64(stride)