diff --git a/pkg/compactindex/build.go b/pkg/compactindex/build.go index 5e65a54..42fbcd8 100644 --- a/pkg/compactindex/build.go +++ b/pkg/compactindex/build.go @@ -137,7 +137,7 @@ func (b *Builder) sealBucket(ctx context.Context, i int, f *os.File) error { wr := bufio.NewWriter(f) entryBuf := make([]byte, desc.HashLen+intWidth(b.FileSize)) // TODO remove hardcoded constant for _, entry := range entries { - desc.storeEntry(entryBuf, entry) + desc.marshalEntry(entryBuf, entry) if _, err := wr.Write(entryBuf[:]); err != nil { return fmt.Errorf("failed to write record to index: %w", err) } diff --git a/pkg/compactindex/build_test.go b/pkg/compactindex/build_test.go index 7bc0d3b..1f70229 100644 --- a/pkg/compactindex/build_test.go +++ b/pkg/compactindex/build_test.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/vbauerster/mpb/v8/decor" ) func TestBuilder(t *testing.T) { @@ -181,7 +182,7 @@ func TestBuilder_Random(t *testing.T) { const numKeys = uint(500000) const keySize = uint(16) const maxOffset = uint64(1000000) - const queries = int(1000) + const queries = int(10000) // Create new builder session. builder, err := NewBuilder("", numKeys, maxOffset) @@ -221,7 +222,7 @@ func TestBuilder_Random(t *testing.T) { // Print some stats. targetStat, err := targetFile.Stat() require.NoError(t, err) - t.Logf("Index size: %d", targetStat.Size()) + t.Logf("Index size: %d (% .2f)", targetStat.Size(), decor.SizeB1000(targetStat.Size())) t.Logf("Bytes per entry: %f", float64(targetStat.Size())/float64(numKeys)) t.Logf("Indexing speed: %f/s", float64(numKeys)/time.Since(preInsert).Seconds()) @@ -237,16 +238,12 @@ func TestBuilder_Random(t *testing.T) { keyN := uint64(rand.Int63n(int64(numKeys))) binary.LittleEndian.PutUint64(key, keyN) - bucket, err := db.FindBucket(key) + bucket, err := db.LookupBucket(key) require.NoError(t, err) - hash := EntryHash64(bucket.HashDomain, key) & 0xFFFFFF // TODO fix mask - entries, err := bucket.Load(1024) + value, err := bucket.Lookup(key) require.NoError(t, err) - - // XXX use external binary search here - entry := SearchSortedEntries(entries, hash) - require.NotNil(t, entry) + require.True(t, value > 0) } t.Logf("Queried %d items", queries) t.Logf("Query speed: %f/s", float64(queries)/time.Since(preQuery).Seconds()) diff --git a/pkg/compactindex/compactindex.go b/pkg/compactindex/compactindex.go index 7d0bbb6..98430dc 100644 --- a/pkg/compactindex/compactindex.go +++ b/pkg/compactindex/compactindex.go @@ -183,13 +183,13 @@ type BucketDescriptor struct { OffsetWidth uint8 // with of offset field in bucket } -func (b *BucketDescriptor) loadEntry(buf []byte) (e Entry) { +func (b *BucketDescriptor) unmarshalEntry(buf []byte) (e Entry) { e.Hash = uintLe(buf[0:b.HashLen]) e.Value = uintLe(buf[b.HashLen : b.HashLen+b.OffsetWidth]) return } -func (b *BucketDescriptor) storeEntry(buf []byte, e Entry) { +func (b *BucketDescriptor) marshalEntry(buf []byte, e Entry) { if len(buf) < int(b.Stride) { panic("serializeEntry: buf too small") } diff --git a/pkg/compactindex/query.go b/pkg/compactindex/query.go index a220f75..74f91c9 100644 --- a/pkg/compactindex/query.go +++ b/pkg/compactindex/query.go @@ -33,8 +33,19 @@ func Open(stream io.ReaderAt) (*DB, error) { return db, nil } -// FindBucket returns a handle to the bucket that might contain the given key. -func (db *DB) FindBucket(key []byte) (*Bucket, error) { +// Lookup queries for a key in the index and returns the value (offset), if any. +// +// Returns ErrNotFound if the key is unknown. +func (db *DB) Lookup(key []byte) (uint64, error) { + bucket, err := db.LookupBucket(key) + if err != nil { + return 0, err + } + return bucket.Lookup(key) +} + +// LookupBucket returns a handle to the bucket that might contain the given key. +func (db *DB) LookupBucket(key []byte) (*Bucket, error) { return db.GetBucket(db.Header.BucketHash(key)) } @@ -102,7 +113,7 @@ const targetEntriesPerBucket = 10000 // Load retrieves all entries in the hashtable. func (b *Bucket) Load(batchSize int) ([]Entry, error) { if batchSize <= 0 { - batchSize = 1 + batchSize = 512 // default to reasonable batch size } // TODO bounds check if b.NumEntries > maxEntriesPerBucket { @@ -119,7 +130,7 @@ func (b *Bucket) Load(batchSize int) ([]Entry, error) { // Decode all entries in it. sub := buf[:n] for len(sub) >= stride { - entries = append(entries, b.loadEntry(sub)) + entries = append(entries, b.unmarshalEntry(sub)) sub = sub[stride:] off += int64(stride) } @@ -134,4 +145,42 @@ func (b *Bucket) Load(batchSize int) ([]Entry, error) { return entries, nil } +// TODO: This binary search algo is not optimized for high-latency remotes yet. + +// Lookup queries for a key using binary search. +func (b *Bucket) Lookup(key []byte) (uint64, error) { + return b.binarySearch(b.Hash(key)) +} + +func (b *Bucket) binarySearch(target uint64) (uint64, error) { + low := 0 + high := int(b.NumEntries) + for low <= high { + median := (low + high) / 2 + entry, err := b.loadEntry(median) + if err != nil { + return 0, err + } + if entry.Hash == target { + return entry.Value, nil + } else if entry.Hash < target { + low = median + 1 + } else { + high = median - 1 + } + } + return 0, ErrNotFound +} + +func (b *Bucket) loadEntry(i int) (Entry, error) { + off := int64(i) * int64(b.Stride) + buf := make([]byte, b.Stride) + n, err := b.Entries.ReadAt(buf, off) + if n != len(buf) { + return Entry{}, err + } + return b.unmarshalEntry(buf), nil +} + +// ErrNotFound marks a missing entry. var ErrNotFound = errors.New("not found")