Optimize bubble votes in avalanche with vertex heap (#47)

This commit is contained in:
aaronbuchwald 2020-06-03 14:07:44 -04:00 committed by GitHub
parent 0a5eb4dca8
commit 5d67251487
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 284 additions and 23 deletions

View File

@ -62,6 +62,9 @@ func (b *UniqueBag) Difference(diff *UniqueBag) {
// GetSet ...
func (b *UniqueBag) GetSet(id ID) BitSet { return (*b)[*id.ID] }
// RemoveSet ...
func (b *UniqueBag) RemoveSet(id ID) { delete(*b, id.Key()) }
// List ...
func (b *UniqueBag) List() []ID {
idList := []ID(nil)

View File

@ -77,6 +77,9 @@ type Vertex interface {
// Returns the vertices this vertex depends on
Parents() []Vertex
// Returns the height of this vertex
Height() uint64
// Returns a series of state transitions to be performed on acceptance
Txs() []snowstorm.Tx

View File

@ -16,7 +16,7 @@ type Vtx struct {
id ids.ID
txs []snowstorm.Tx
height int
height uint64
status choices.Status
bytes []byte
@ -25,6 +25,7 @@ type Vtx struct {
func (v *Vtx) ID() ids.ID { return v.id }
func (v *Vtx) ParentIDs() []ids.ID { return nil }
func (v *Vtx) Parents() []Vertex { return v.dependencies }
func (v *Vtx) Height() uint64 { return v.height }
func (v *Vtx) Txs() []snowstorm.Tx { return v.txs }
func (v *Vtx) Status() choices.Status { return v.status }
func (v *Vtx) Live() {}

View File

@ -27,7 +27,7 @@ type Vtx struct {
id ids.ID
txs []snowstorm.Tx
height int
height uint64
status choices.Status
bytes []byte
@ -36,6 +36,7 @@ type Vtx struct {
func (v *Vtx) ID() ids.ID { return v.id }
func (v *Vtx) DependencyIDs() []ids.ID { return nil }
func (v *Vtx) Parents() []avalanche.Vertex { return v.parents }
func (v *Vtx) Height() uint64 { return v.height }
func (v *Vtx) Txs() []snowstorm.Tx { return v.txs }
func (v *Vtx) Status() choices.Status { return v.status }
func (v *Vtx) Accept() error { v.status = choices.Accepted; return nil }

View File

@ -121,6 +121,12 @@ func (vtx *uniqueVertex) Parents() []avalanche.Vertex {
return vtx.v.parents
}
func (vtx *uniqueVertex) Height() uint64 {
vtx.refresh()
return vtx.v.vtx.height
}
func (vtx *uniqueVertex) Txs() []snowstorm.Tx {
vtx.refresh()

View File

@ -0,0 +1,109 @@
package avalanche
import (
"container/heap"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
)
// A vertexItem is a Vertex managed by the priority queue.
type vertexItem struct {
vertex avalanche.Vertex
index int // The index of the item in the heap.
}
// A priorityQueue implements heap.Interface and holds vertexItems.
type priorityQueue []*vertexItem
func (pq priorityQueue) Len() int { return len(pq) }
func (pq priorityQueue) Less(i, j int) bool {
statusI := pq[i].vertex.Status()
statusJ := pq[j].vertex.Status()
// Put unknown vertices at the front of the heap to ensure
// once we have made it below a certain height in DAG traversal
// we do not need to reset
if !statusI.Fetched() {
return true
}
if !statusJ.Fetched() {
return false
}
return pq[i].vertex.Height() > pq[j].vertex.Height()
}
func (pq priorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *priorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*vertexItem)
item.index = n
*pq = append(*pq, item)
}
func (pq *priorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil
item.index = -1
*pq = old[0 : n-1]
return item
}
// vertexHeap defines the functionality of a heap of vertices
// with unique VertexIDs ordered by height
type vertexHeap interface {
Clear()
Push(avalanche.Vertex)
Pop() avalanche.Vertex // Requires that there be at least one element
Contains(avalanche.Vertex) bool
Len() int
}
type maxHeightVertexHeap struct {
heap *priorityQueue
elementIDs ids.Set
}
func NewMaxVertexHeap() *maxHeightVertexHeap {
return &maxHeightVertexHeap{
heap: &priorityQueue{},
elementIDs: ids.Set{},
}
}
func (vh *maxHeightVertexHeap) Clear() {
vh.heap = &priorityQueue{}
vh.elementIDs.Clear()
}
func (vh *maxHeightVertexHeap) Push(vtx avalanche.Vertex) bool {
vtxID := vtx.ID()
if vh.elementIDs.Contains(vtxID) {
return false
}
vh.elementIDs.Add(vtxID)
item := &vertexItem{
vertex: vtx,
}
heap.Push(vh.heap, item)
return true
}
func (vh *maxHeightVertexHeap) Pop() avalanche.Vertex {
vtx := heap.Pop(vh.heap).(*vertexItem).vertex
vh.elementIDs.Remove(vtx.ID())
return vtx
}
func (vh *maxHeightVertexHeap) Len() int { return vh.heap.Len() }
func (vh *maxHeightVertexHeap) Contains(vtxID ids.ID) bool { return vh.elementIDs.Contains(vtxID) }

View File

@ -0,0 +1,130 @@
package avalanche
import (
"testing"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
)
// This example inserts several ints into an IntHeap, checks the minimum,
// and removes them in order of priority.
func TestUniqueVertexHeapReturnsOrdered(t *testing.T) {
h := NewMaxVertexHeap()
vtx0 := &Vtx{
id: GenerateID(),
height: 0,
status: choices.Processing,
}
vtx1 := &Vtx{
id: GenerateID(),
height: 1,
status: choices.Processing,
}
vtx2 := &Vtx{
id: GenerateID(),
height: 1,
status: choices.Processing,
}
vtx3 := &Vtx{
id: GenerateID(),
height: 3,
status: choices.Processing,
}
vtx4 := &Vtx{
id: GenerateID(),
status: choices.Unknown,
}
vts := []avalanche.Vertex{vtx0, vtx1, vtx2, vtx3, vtx4}
for _, vtx := range vts {
h.Push(vtx)
}
vtxZ := h.Pop()
if !vtxZ.ID().Equals(vtx4.ID()) {
t.Fatalf("Heap did not pop unknonw element first")
}
vtxA := h.Pop()
if vtxA.Height() != 3 {
t.Fatalf("First height from heap was incorrect")
} else if !vtxA.ID().Equals(vtx3.ID()) {
t.Fatalf("Incorrect ID on vertex popped from heap")
}
vtxB := h.Pop()
if vtxB.Height() != 1 {
t.Fatalf("First height from heap was incorrect")
} else if !vtxB.ID().Equals(vtx1.ID()) && !vtxB.ID().Equals(vtx2.ID()) {
t.Fatalf("Incorrect ID on vertex popped from heap")
}
vtxC := h.Pop()
if vtxC.Height() != 1 {
t.Fatalf("First height from heap was incorrect")
} else if !vtxC.ID().Equals(vtx1.ID()) && !vtxC.ID().Equals(vtx2.ID()) {
t.Fatalf("Incorrect ID on vertex popped from heap")
}
if vtxB.ID().Equals(vtxC.ID()) {
t.Fatalf("Heap returned same element more than once")
}
vtxD := h.Pop()
if vtxD.Height() != 0 {
t.Fatalf("Last height returned was incorrect")
} else if !vtxD.ID().Equals(vtx0.ID()) {
t.Fatalf("Last item from heap had incorrect ID")
}
if h.Len() != 0 {
t.Fatalf("Heap was not empty after popping all of its elements")
}
}
func TestUniqueVertexHeapRemainsUnique(t *testing.T) {
h := NewMaxVertexHeap()
vtx0 := &Vtx{
height: 0,
id: GenerateID(),
status: choices.Processing,
}
vtx1 := &Vtx{
height: 1,
id: GenerateID(),
status: choices.Processing,
}
sharedID := GenerateID()
vtx2 := &Vtx{
height: 1,
id: sharedID,
status: choices.Processing,
}
vtx3 := &Vtx{
height: 2,
id: sharedID,
status: choices.Processing,
}
pushed1 := h.Push(vtx0)
pushed2 := h.Push(vtx1)
pushed3 := h.Push(vtx2)
pushed4 := h.Push(vtx3)
if h.Len() != 3 {
t.Fatalf("Unique Vertex Heap has incorrect length: %d", h.Len())
} else if !(pushed1 && pushed2 && pushed3) {
t.Fatalf("Failed to push a new unique element")
} else if pushed4 {
t.Fatalf("Pushed non-unique element to the unique vertex heap")
}
}

View File

@ -5,7 +5,6 @@ package avalanche
import (
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
)
@ -70,37 +69,46 @@ func (v *voter) Update() {
func (v *voter) bubbleVotes(votes ids.UniqueBag) ids.UniqueBag {
bubbledVotes := ids.UniqueBag{}
vertexHeap := NewMaxVertexHeap()
for _, vote := range votes.List() {
set := votes.GetSet(vote)
vtx, err := v.t.Config.State.GetVertex(vote)
if err != nil {
continue
}
vts := []avalanche.Vertex{vtx}
for len(vts) > 0 {
vtx := vts[0]
vts = vts[1:]
vertexHeap.Push(vtx)
}
status := vtx.Status()
if !status.Fetched() {
v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is unknown", set.Len(), vtx.ID())
continue
}
for vertexHeap.Len() > 0 {
vtx := vertexHeap.Pop()
vtxID := vtx.ID()
set := votes.GetSet(vtxID)
status := vtx.Status()
if status.Decided() {
v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is decided", set.Len(), vtx.ID())
continue
}
if !status.Fetched() {
v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is unknown", set.Len(), vtxID)
bubbledVotes.RemoveSet(vtx.ID())
continue
}
if v.t.Consensus.VertexIssued(vtx) {
v.t.Config.Context.Log.Verbo("Applying %d vote(s) for %s", set.Len(), vtx.ID())
bubbledVotes.UnionSet(vtx.ID(), set)
} else {
v.t.Config.Context.Log.Verbo("Bubbling %d vote(s) for %s because the vertex isn't issued", set.Len(), vtx.ID())
vts = append(vts, vtx.Parents()...)
if status.Decided() {
v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is decided", set.Len(), vtxID)
bubbledVotes.RemoveSet(vtx.ID())
continue
}
if v.t.Consensus.VertexIssued(vtx) {
v.t.Config.Context.Log.Verbo("Applying %d vote(s) for %s", set.Len(), vtx.ID())
bubbledVotes.UnionSet(vtx.ID(), set)
} else {
v.t.Config.Context.Log.Verbo("Bubbling %d vote(s) for %s because the vertex isn't issued", set.Len(), vtx.ID())
bubbledVotes.RemoveSet(vtx.ID()) // Remove votes for this vertex because it hasn't been issued
for _, parentVtx := range vtx.Parents() {
bubbledVotes.UnionSet(parentVtx.ID(), set)
vertexHeap.Push(parentVtx)
}
}
}
return bubbledVotes
}