From eaae12101eac0594afc3323b56a12e52744bdda6 Mon Sep 17 00:00:00 2001 From: Ethan Frey Date: Thu, 13 Jul 2017 20:09:25 +0200 Subject: [PATCH] Add queue implementaiton to state --- state/queue.go | 79 +++++++++++++++++++++++++++++++++++++++++++++ state/queue_test.go | 60 ++++++++++++++++++++++++++++++++++ 2 files changed, 139 insertions(+) create mode 100644 state/queue.go create mode 100644 state/queue_test.go diff --git a/state/queue.go b/state/queue.go new file mode 100644 index 000000000..db4869172 --- /dev/null +++ b/state/queue.go @@ -0,0 +1,79 @@ +package state + +import "encoding/binary" + +var ( + headKey = []byte("h") + tailKey = []byte("t") + dataKey = []byte("d") +) + +// Queue allows us to fill up a range of the db, and grab from either end +type Queue struct { + store KVStore + head uint64 // if Size() > 0, the first element is here + tail uint64 // this is the first empty slot to Push() to +} + +// NewQueue will load or initialize a queue in this state-space +// +// Generally, you will want to stack.PrefixStore() the space first +func NewQueue(store KVStore) *Queue { + q := &Queue{store: store} + q.head = q.getCount(headKey) + q.tail = q.getCount(tailKey) + return q +} + +// Tail returns the next slot that Push() will use +func (q *Queue) Tail() uint64 { + return q.tail +} + +// Size returns how many elements are in the queue +func (q *Queue) Size() uint64 { + return q.tail - q.head +} + +// Push adds an element to the tail of the queue and returns it's location +func (q *Queue) Push(value []byte) uint64 { + key := makeKey(q.tail) + q.store.Set(key, value) + q.tail++ + q.setCount(tailKey, q.tail) + return q.tail - 1 +} + +// Pop gets an element from the end of the queue +func (q *Queue) Pop() []byte { + if q.Size() <= 0 { + return nil + } + key := makeKey(q.head) + value := q.store.Get(key) + q.head++ + q.setCount(headKey, q.head) + return value +} + +func (q *Queue) setCount(key []byte, val uint64) { + b := make([]byte, 8) + binary.BigEndian.PutUint64(b, val) + q.store.Set(key, b) +} + +func (q *Queue) getCount(key []byte) (val uint64) { + b := q.store.Get(key) + if b != nil { + val = binary.BigEndian.Uint64(b) + } + return val +} + +// makeKey returns the key for a data point +func makeKey(val uint64) []byte { + b := make([]byte, 8+len(dataKey)) + copy(b, dataKey) + binary.BigEndian.PutUint64(b[len(dataKey):], val) + return b +} diff --git a/state/queue_test.go b/state/queue_test.go new file mode 100644 index 000000000..df367e5bb --- /dev/null +++ b/state/queue_test.go @@ -0,0 +1,60 @@ +package state + +import ( + "testing" + + "github.com/stretchr/testify/assert" +) + +func TestQueue(t *testing.T) { + assert := assert.New(t) + + cases := []struct { + pushes [][]byte + pops [][]byte + }{ + // fill it up and empty it all + { + [][]byte{{1, 2, 3}, {44}, {3, 0}}, + [][]byte{{1, 2, 3}, {44}, {3, 0}}, + }, + // don't empty everything - size is 1 at the end + { + [][]byte{{77, 22}, {11, 9}, {121}}, + [][]byte{{77, 22}, {11, 9}}, + }, + // empty too much, just get nil, no negative size + { + [][]byte{{1}, {2}, {4}}, + [][]byte{{1}, {2}, {4}, nil, nil, nil}, + }, + } + + for i, tc := range cases { + store := NewMemKVStore() + + // initialize a queue and add items + q := NewQueue(store) + for j, in := range tc.pushes { + cnt := q.Push(in) + assert.Equal(uint64(j), cnt, "%d", i) + } + assert.EqualValues(len(tc.pushes), q.Size()) + + // load from disk and pop them + r := NewQueue(store) + for _, out := range tc.pops { + val := r.Pop() + assert.Equal(out, val, "%d", i) + } + + // it's empty in memory and on disk + expected := len(tc.pushes) - len(tc.pops) + if expected < 0 { + expected = 0 + } + assert.EqualValues(expected, r.Size()) + s := NewQueue(store) + assert.EqualValues(expected, s.Size()) + } +}