Add queue implementaiton to state

This commit is contained in:
Ethan Frey 2017-07-13 20:09:25 +02:00
parent 697c2f1e04
commit eaae12101e
2 changed files with 139 additions and 0 deletions

79
state/queue.go Normal file
View File

@ -0,0 +1,79 @@
package state
import "encoding/binary"
var (
headKey = []byte("h")
tailKey = []byte("t")
dataKey = []byte("d")
)
// Queue allows us to fill up a range of the db, and grab from either end
type Queue struct {
store KVStore
head uint64 // if Size() > 0, the first element is here
tail uint64 // this is the first empty slot to Push() to
}
// NewQueue will load or initialize a queue in this state-space
//
// Generally, you will want to stack.PrefixStore() the space first
func NewQueue(store KVStore) *Queue {
q := &Queue{store: store}
q.head = q.getCount(headKey)
q.tail = q.getCount(tailKey)
return q
}
// Tail returns the next slot that Push() will use
func (q *Queue) Tail() uint64 {
return q.tail
}
// Size returns how many elements are in the queue
func (q *Queue) Size() uint64 {
return q.tail - q.head
}
// Push adds an element to the tail of the queue and returns it's location
func (q *Queue) Push(value []byte) uint64 {
key := makeKey(q.tail)
q.store.Set(key, value)
q.tail++
q.setCount(tailKey, q.tail)
return q.tail - 1
}
// Pop gets an element from the end of the queue
func (q *Queue) Pop() []byte {
if q.Size() <= 0 {
return nil
}
key := makeKey(q.head)
value := q.store.Get(key)
q.head++
q.setCount(headKey, q.head)
return value
}
func (q *Queue) setCount(key []byte, val uint64) {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, val)
q.store.Set(key, b)
}
func (q *Queue) getCount(key []byte) (val uint64) {
b := q.store.Get(key)
if b != nil {
val = binary.BigEndian.Uint64(b)
}
return val
}
// makeKey returns the key for a data point
func makeKey(val uint64) []byte {
b := make([]byte, 8+len(dataKey))
copy(b, dataKey)
binary.BigEndian.PutUint64(b[len(dataKey):], val)
return b
}

60
state/queue_test.go Normal file
View File

@ -0,0 +1,60 @@
package state
import (
"testing"
"github.com/stretchr/testify/assert"
)
func TestQueue(t *testing.T) {
assert := assert.New(t)
cases := []struct {
pushes [][]byte
pops [][]byte
}{
// fill it up and empty it all
{
[][]byte{{1, 2, 3}, {44}, {3, 0}},
[][]byte{{1, 2, 3}, {44}, {3, 0}},
},
// don't empty everything - size is 1 at the end
{
[][]byte{{77, 22}, {11, 9}, {121}},
[][]byte{{77, 22}, {11, 9}},
},
// empty too much, just get nil, no negative size
{
[][]byte{{1}, {2}, {4}},
[][]byte{{1}, {2}, {4}, nil, nil, nil},
},
}
for i, tc := range cases {
store := NewMemKVStore()
// initialize a queue and add items
q := NewQueue(store)
for j, in := range tc.pushes {
cnt := q.Push(in)
assert.Equal(uint64(j), cnt, "%d", i)
}
assert.EqualValues(len(tc.pushes), q.Size())
// load from disk and pop them
r := NewQueue(store)
for _, out := range tc.pops {
val := r.Pop()
assert.Equal(out, val, "%d", i)
}
// it's empty in memory and on disk
expected := len(tc.pushes) - len(tc.pops)
if expected < 0 {
expected = 0
}
assert.EqualValues(expected, r.Size())
s := NewQueue(store)
assert.EqualValues(expected, s.Size())
}
}