From 5d67251487804e96d57d273836093c19c28acff1 Mon Sep 17 00:00:00 2001 From: aaronbuchwald Date: Wed, 3 Jun 2020 14:07:44 -0400 Subject: [PATCH] Optimize bubble votes in avalanche with vertex heap (#47) --- ids/unique_bag.go | 3 + snow/consensus/avalanche/consensus.go | 3 + snow/consensus/avalanche/vertex_test.go | 3 +- snow/engine/avalanche/engine_test.go | 3 +- snow/engine/avalanche/state/unique_vertex.go | 6 + snow/engine/avalanche/vertex_heap.go | 109 ++++++++++++++++ snow/engine/avalanche/vertex_heap_test.go | 130 +++++++++++++++++++ snow/engine/avalanche/voter.go | 50 ++++--- 8 files changed, 284 insertions(+), 23 deletions(-) create mode 100644 snow/engine/avalanche/vertex_heap.go create mode 100644 snow/engine/avalanche/vertex_heap_test.go diff --git a/ids/unique_bag.go b/ids/unique_bag.go index 6117cb7..d5d3e36 100644 --- a/ids/unique_bag.go +++ b/ids/unique_bag.go @@ -62,6 +62,9 @@ func (b *UniqueBag) Difference(diff *UniqueBag) { // GetSet ... func (b *UniqueBag) GetSet(id ID) BitSet { return (*b)[*id.ID] } +// RemoveSet ... +func (b *UniqueBag) RemoveSet(id ID) { delete(*b, id.Key()) } + // List ... func (b *UniqueBag) List() []ID { idList := []ID(nil) diff --git a/snow/consensus/avalanche/consensus.go b/snow/consensus/avalanche/consensus.go index 2237c68..d88a355 100644 --- a/snow/consensus/avalanche/consensus.go +++ b/snow/consensus/avalanche/consensus.go @@ -77,6 +77,9 @@ type Vertex interface { // Returns the vertices this vertex depends on Parents() []Vertex + // Returns the height of this vertex + Height() uint64 + // Returns a series of state transitions to be performed on acceptance Txs() []snowstorm.Tx diff --git a/snow/consensus/avalanche/vertex_test.go b/snow/consensus/avalanche/vertex_test.go index 6beceba..f7aee5a 100644 --- a/snow/consensus/avalanche/vertex_test.go +++ b/snow/consensus/avalanche/vertex_test.go @@ -16,7 +16,7 @@ type Vtx struct { id ids.ID txs []snowstorm.Tx - height int + height uint64 status choices.Status bytes []byte @@ -25,6 +25,7 @@ type Vtx struct { func (v *Vtx) ID() ids.ID { return v.id } func (v *Vtx) ParentIDs() []ids.ID { return nil } func (v *Vtx) Parents() []Vertex { return v.dependencies } +func (v *Vtx) Height() uint64 { return v.height } func (v *Vtx) Txs() []snowstorm.Tx { return v.txs } func (v *Vtx) Status() choices.Status { return v.status } func (v *Vtx) Live() {} diff --git a/snow/engine/avalanche/engine_test.go b/snow/engine/avalanche/engine_test.go index ee16c8a..cb98307 100644 --- a/snow/engine/avalanche/engine_test.go +++ b/snow/engine/avalanche/engine_test.go @@ -27,7 +27,7 @@ type Vtx struct { id ids.ID txs []snowstorm.Tx - height int + height uint64 status choices.Status bytes []byte @@ -36,6 +36,7 @@ type Vtx struct { func (v *Vtx) ID() ids.ID { return v.id } func (v *Vtx) DependencyIDs() []ids.ID { return nil } func (v *Vtx) Parents() []avalanche.Vertex { return v.parents } +func (v *Vtx) Height() uint64 { return v.height } func (v *Vtx) Txs() []snowstorm.Tx { return v.txs } func (v *Vtx) Status() choices.Status { return v.status } func (v *Vtx) Accept() error { v.status = choices.Accepted; return nil } diff --git a/snow/engine/avalanche/state/unique_vertex.go b/snow/engine/avalanche/state/unique_vertex.go index 917a77f..f7c6927 100644 --- a/snow/engine/avalanche/state/unique_vertex.go +++ b/snow/engine/avalanche/state/unique_vertex.go @@ -121,6 +121,12 @@ func (vtx *uniqueVertex) Parents() []avalanche.Vertex { return vtx.v.parents } +func (vtx *uniqueVertex) Height() uint64 { + vtx.refresh() + + return vtx.v.vtx.height +} + func (vtx *uniqueVertex) Txs() []snowstorm.Tx { vtx.refresh() diff --git a/snow/engine/avalanche/vertex_heap.go b/snow/engine/avalanche/vertex_heap.go new file mode 100644 index 0000000..59fff2e --- /dev/null +++ b/snow/engine/avalanche/vertex_heap.go @@ -0,0 +1,109 @@ +package avalanche + +import ( + "container/heap" + + "github.com/ava-labs/gecko/ids" + "github.com/ava-labs/gecko/snow/consensus/avalanche" +) + +// A vertexItem is a Vertex managed by the priority queue. +type vertexItem struct { + vertex avalanche.Vertex + index int // The index of the item in the heap. +} + +// A priorityQueue implements heap.Interface and holds vertexItems. +type priorityQueue []*vertexItem + +func (pq priorityQueue) Len() int { return len(pq) } + +func (pq priorityQueue) Less(i, j int) bool { + statusI := pq[i].vertex.Status() + statusJ := pq[j].vertex.Status() + + // Put unknown vertices at the front of the heap to ensure + // once we have made it below a certain height in DAG traversal + // we do not need to reset + if !statusI.Fetched() { + return true + } + if !statusJ.Fetched() { + return false + } + return pq[i].vertex.Height() > pq[j].vertex.Height() +} + +func (pq priorityQueue) Swap(i, j int) { + pq[i], pq[j] = pq[j], pq[i] + pq[i].index = i + pq[j].index = j +} + +func (pq *priorityQueue) Push(x interface{}) { + n := len(*pq) + item := x.(*vertexItem) + item.index = n + *pq = append(*pq, item) +} + +func (pq *priorityQueue) Pop() interface{} { + old := *pq + n := len(old) + item := old[n-1] + old[n-1] = nil + item.index = -1 + *pq = old[0 : n-1] + return item +} + +// vertexHeap defines the functionality of a heap of vertices +// with unique VertexIDs ordered by height +type vertexHeap interface { + Clear() + Push(avalanche.Vertex) + Pop() avalanche.Vertex // Requires that there be at least one element + Contains(avalanche.Vertex) bool + Len() int +} + +type maxHeightVertexHeap struct { + heap *priorityQueue + elementIDs ids.Set +} + +func NewMaxVertexHeap() *maxHeightVertexHeap { + return &maxHeightVertexHeap{ + heap: &priorityQueue{}, + elementIDs: ids.Set{}, + } +} + +func (vh *maxHeightVertexHeap) Clear() { + vh.heap = &priorityQueue{} + vh.elementIDs.Clear() +} + +func (vh *maxHeightVertexHeap) Push(vtx avalanche.Vertex) bool { + vtxID := vtx.ID() + if vh.elementIDs.Contains(vtxID) { + return false + } + + vh.elementIDs.Add(vtxID) + item := &vertexItem{ + vertex: vtx, + } + heap.Push(vh.heap, item) + return true +} + +func (vh *maxHeightVertexHeap) Pop() avalanche.Vertex { + vtx := heap.Pop(vh.heap).(*vertexItem).vertex + vh.elementIDs.Remove(vtx.ID()) + return vtx +} + +func (vh *maxHeightVertexHeap) Len() int { return vh.heap.Len() } + +func (vh *maxHeightVertexHeap) Contains(vtxID ids.ID) bool { return vh.elementIDs.Contains(vtxID) } diff --git a/snow/engine/avalanche/vertex_heap_test.go b/snow/engine/avalanche/vertex_heap_test.go new file mode 100644 index 0000000..5f27a18 --- /dev/null +++ b/snow/engine/avalanche/vertex_heap_test.go @@ -0,0 +1,130 @@ +package avalanche + +import ( + "testing" + + "github.com/ava-labs/gecko/snow/choices" + "github.com/ava-labs/gecko/snow/consensus/avalanche" +) + +// This example inserts several ints into an IntHeap, checks the minimum, +// and removes them in order of priority. +func TestUniqueVertexHeapReturnsOrdered(t *testing.T) { + h := NewMaxVertexHeap() + + vtx0 := &Vtx{ + id: GenerateID(), + height: 0, + status: choices.Processing, + } + + vtx1 := &Vtx{ + id: GenerateID(), + height: 1, + status: choices.Processing, + } + + vtx2 := &Vtx{ + id: GenerateID(), + height: 1, + status: choices.Processing, + } + + vtx3 := &Vtx{ + id: GenerateID(), + height: 3, + status: choices.Processing, + } + + vtx4 := &Vtx{ + id: GenerateID(), + status: choices.Unknown, + } + + vts := []avalanche.Vertex{vtx0, vtx1, vtx2, vtx3, vtx4} + + for _, vtx := range vts { + h.Push(vtx) + } + + vtxZ := h.Pop() + if !vtxZ.ID().Equals(vtx4.ID()) { + t.Fatalf("Heap did not pop unknonw element first") + } + + vtxA := h.Pop() + if vtxA.Height() != 3 { + t.Fatalf("First height from heap was incorrect") + } else if !vtxA.ID().Equals(vtx3.ID()) { + t.Fatalf("Incorrect ID on vertex popped from heap") + } + + vtxB := h.Pop() + if vtxB.Height() != 1 { + t.Fatalf("First height from heap was incorrect") + } else if !vtxB.ID().Equals(vtx1.ID()) && !vtxB.ID().Equals(vtx2.ID()) { + t.Fatalf("Incorrect ID on vertex popped from heap") + } + + vtxC := h.Pop() + if vtxC.Height() != 1 { + t.Fatalf("First height from heap was incorrect") + } else if !vtxC.ID().Equals(vtx1.ID()) && !vtxC.ID().Equals(vtx2.ID()) { + t.Fatalf("Incorrect ID on vertex popped from heap") + } + + if vtxB.ID().Equals(vtxC.ID()) { + t.Fatalf("Heap returned same element more than once") + } + + vtxD := h.Pop() + if vtxD.Height() != 0 { + t.Fatalf("Last height returned was incorrect") + } else if !vtxD.ID().Equals(vtx0.ID()) { + t.Fatalf("Last item from heap had incorrect ID") + } + + if h.Len() != 0 { + t.Fatalf("Heap was not empty after popping all of its elements") + } +} + +func TestUniqueVertexHeapRemainsUnique(t *testing.T) { + h := NewMaxVertexHeap() + + vtx0 := &Vtx{ + height: 0, + id: GenerateID(), + status: choices.Processing, + } + vtx1 := &Vtx{ + height: 1, + id: GenerateID(), + status: choices.Processing, + } + + sharedID := GenerateID() + vtx2 := &Vtx{ + height: 1, + id: sharedID, + status: choices.Processing, + } + + vtx3 := &Vtx{ + height: 2, + id: sharedID, + status: choices.Processing, + } + + pushed1 := h.Push(vtx0) + pushed2 := h.Push(vtx1) + pushed3 := h.Push(vtx2) + pushed4 := h.Push(vtx3) + if h.Len() != 3 { + t.Fatalf("Unique Vertex Heap has incorrect length: %d", h.Len()) + } else if !(pushed1 && pushed2 && pushed3) { + t.Fatalf("Failed to push a new unique element") + } else if pushed4 { + t.Fatalf("Pushed non-unique element to the unique vertex heap") + } +} diff --git a/snow/engine/avalanche/voter.go b/snow/engine/avalanche/voter.go index 581fe27..b943cb4 100644 --- a/snow/engine/avalanche/voter.go +++ b/snow/engine/avalanche/voter.go @@ -5,7 +5,6 @@ package avalanche import ( "github.com/ava-labs/gecko/ids" - "github.com/ava-labs/gecko/snow/consensus/avalanche" "github.com/ava-labs/gecko/snow/consensus/snowstorm" ) @@ -70,37 +69,46 @@ func (v *voter) Update() { func (v *voter) bubbleVotes(votes ids.UniqueBag) ids.UniqueBag { bubbledVotes := ids.UniqueBag{} + vertexHeap := NewMaxVertexHeap() for _, vote := range votes.List() { - set := votes.GetSet(vote) vtx, err := v.t.Config.State.GetVertex(vote) if err != nil { continue } - vts := []avalanche.Vertex{vtx} - for len(vts) > 0 { - vtx := vts[0] - vts = vts[1:] + vertexHeap.Push(vtx) + } - status := vtx.Status() - if !status.Fetched() { - v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is unknown", set.Len(), vtx.ID()) - continue - } + for vertexHeap.Len() > 0 { + vtx := vertexHeap.Pop() + vtxID := vtx.ID() + set := votes.GetSet(vtxID) + status := vtx.Status() - if status.Decided() { - v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is decided", set.Len(), vtx.ID()) - continue - } + if !status.Fetched() { + v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is unknown", set.Len(), vtxID) + bubbledVotes.RemoveSet(vtx.ID()) + continue + } - if v.t.Consensus.VertexIssued(vtx) { - v.t.Config.Context.Log.Verbo("Applying %d vote(s) for %s", set.Len(), vtx.ID()) - bubbledVotes.UnionSet(vtx.ID(), set) - } else { - v.t.Config.Context.Log.Verbo("Bubbling %d vote(s) for %s because the vertex isn't issued", set.Len(), vtx.ID()) - vts = append(vts, vtx.Parents()...) + if status.Decided() { + v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is decided", set.Len(), vtxID) + bubbledVotes.RemoveSet(vtx.ID()) + continue + } + + if v.t.Consensus.VertexIssued(vtx) { + v.t.Config.Context.Log.Verbo("Applying %d vote(s) for %s", set.Len(), vtx.ID()) + bubbledVotes.UnionSet(vtx.ID(), set) + } else { + v.t.Config.Context.Log.Verbo("Bubbling %d vote(s) for %s because the vertex isn't issued", set.Len(), vtx.ID()) + bubbledVotes.RemoveSet(vtx.ID()) // Remove votes for this vertex because it hasn't been issued + for _, parentVtx := range vtx.Parents() { + bubbledVotes.UnionSet(parentVtx.ID(), set) + vertexHeap.Push(parentVtx) } } } + return bubbledVotes }