
458 lines
13 KiB
Raw Normal View History

2020-03-10 12:20:34 -07:00
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package avalanche
import (
// TopologicalFactory implements Factory by returning a topological struct
type TopologicalFactory struct{}
// New implements Factory
func (TopologicalFactory) New() Consensus { return &Topological{} }
// TODO: Implement pruning of decisions.
// To perfectly preserve the protocol, this implementation will need to store
// the hashes of all accepted decisions. It is possible to add a heuristic that
// removes sufficiently old decisions. However, that will need to be analyzed to
// ensure safety. It is doable when adding in a weak synchrony assumption.
// Topological performs the avalanche algorithm by utilizing a topological sort
// of the voting results. Assumes that vertices are inserted in topological
// order.
type Topological struct {
2020-03-14 22:09:24 -07:00
2020-03-10 12:20:34 -07:00
// Context used for logging
ctx *snow.Context
// Threshold for confidence increases
params Parameters
// Maps vtxID -> vtx
nodes map[[32]byte]Vertex
// Tracks the conflict relations
cg snowstorm.Consensus
// preferred is the frontier of vtxIDs that are strongly preferred
// virtuous is the frontier of vtxIDs that are strongly virtuous
// orphans are the txIDs that are virtuous, but not preferred
preferred, virtuous, orphans ids.Set
// frontier is the set of vts that have no descendents
frontier map[[32]byte]Vertex
// preferenceCache is the cache for strongly preferred checks
// virtuousCache is the cache for strongly virtuous checks
preferenceCache, virtuousCache map[[32]byte]bool
type kahnNode struct {
inDegree int
votes ids.BitSet
// Initialize implements the Avalanche interface
func (ta *Topological) Initialize(ctx *snow.Context, params Parameters, frontier []Vertex) {
ta.ctx = ctx
ta.params = params
2020-03-14 22:09:24 -07:00
if err := ta.metrics.Initialize(ctx.Log, params.Namespace, params.Metrics); err != nil {
ta.ctx.Log.Error("%s", err)
2020-03-10 12:20:34 -07:00
ta.nodes = make(map[[32]byte]Vertex) = &snowstorm.Directed{}, params.Parameters) = make(map[[32]byte]Vertex)
for _, vtx := range frontier {[vtx.ID().Key()] = vtx
2020-03-10 12:20:34 -07:00
// Parameters implements the Avalanche interface
func (ta *Topological) Parameters() Parameters { return ta.params }
// IsVirtuous implements the Avalanche interface
func (ta *Topological) IsVirtuous(tx snowstorm.Tx) bool { return }
// Add implements the Avalanche interface
func (ta *Topological) Add(vtx Vertex) error {
2020-03-10 12:20:34 -07:00
ta.ctx.Log.AssertTrue(vtx != nil, "Attempting to insert nil vertex")
vtxID := vtx.ID()
key := vtxID.Key()
if vtx.Status().Decided() {
return nil // Already decided this vertex
2020-03-10 12:20:34 -07:00
} else if _, exists := ta.nodes[key]; exists {
return nil // Already inserted this vertex
2020-03-10 12:20:34 -07:00
ta.ctx.ConsensusDispatcher.Issue(ta.ctx.ChainID, vtxID, vtx.Bytes())
for _, tx := range vtx.Txs() {
if !tx.Status().Decided() {
// Add the consumers to the conflict graph.
if err :=; err != nil {
return err
2020-03-10 12:20:34 -07:00
ta.nodes[key] = vtx // Add this vertex to the set of nodes
2020-03-14 22:09:24 -07:00
2020-03-10 12:20:34 -07:00
return ta.update(vtx) // Update the vertex and it's ancestry
2020-03-10 12:20:34 -07:00
// VertexIssued implements the Avalanche interface
func (ta *Topological) VertexIssued(vtx Vertex) bool {
if vtx.Status().Decided() {
return true
_, ok := ta.nodes[vtx.ID().Key()]
return ok
// TxIssued implements the Avalanche interface
func (ta *Topological) TxIssued(tx snowstorm.Tx) bool { return }
// Orphans implements the Avalanche interface
func (ta *Topological) Orphans() ids.Set { return ta.orphans }
// Virtuous implements the Avalanche interface
func (ta *Topological) Virtuous() ids.Set { return ta.virtuous }
// Preferences implements the Avalanche interface
func (ta *Topological) Preferences() ids.Set { return ta.preferred }
// RecordPoll implements the Avalanche interface
func (ta *Topological) RecordPoll(responses ids.UniqueBag) error {
2020-03-10 12:20:34 -07:00
// Set up the topological sort: O(|Live Set|)
kahns, leaves := ta.calculateInDegree(responses)
// Collect the votes for each transaction: O(|Live Set|)
votes := ta.pushVotes(kahns, leaves)
// Update the conflict graph: O(|Transactions|)
ta.ctx.Log.Verbo("Updating consumer confidences based on:\n%s", &votes)
// Update the dag: O(|Live Set|)
return ta.updateFrontiers()
2020-03-10 12:20:34 -07:00
// Quiesce implements the Avalanche interface
func (ta *Topological) Quiesce() bool { return }
// Finalized implements the Avalanche interface
func (ta *Topological) Finalized() bool { return }
// Takes in a list of votes and sets up the topological ordering. Returns the
// reachable section of the graph annotated with the number of inbound edges and
// the non-transitively applied votes. Also returns the list of leaf nodes.
func (ta *Topological) calculateInDegree(
responses ids.UniqueBag) (map[[32]byte]kahnNode, []ids.ID) {
kahns := make(map[[32]byte]kahnNode)
leaves := ids.Set{}
for _, vote := range responses.List() {
key := vote.Key()
// If it is not found, then the vote is either for something decided,
// or something we haven't heard of yet.
if vtx := ta.nodes[key]; vtx != nil {
kahn, previouslySeen := kahns[key]
// Add this new vote to the current bag of votes
kahns[key] = kahn
if !previouslySeen {
// If I've never seen this node before, it is currently a leaf.
ta.markAncestorInDegrees(kahns, leaves, vtx.Parents())
return kahns, leaves.List()
// adds a new in-degree reference for all nodes
func (ta *Topological) markAncestorInDegrees(
kahns map[[32]byte]kahnNode,
leaves ids.Set,
deps []Vertex) (map[[32]byte]kahnNode, ids.Set) {
frontier := []Vertex{}
for _, vtx := range deps {
// The vertex may have been decided, no need to vote in that case
if !vtx.Status().Decided() {
frontier = append(frontier, vtx)
for len(frontier) > 0 {
newLen := len(frontier) - 1
current := frontier[newLen]
frontier = frontier[:newLen]
currentID := current.ID()
currentKey := currentID.Key()
kahn, alreadySeen := kahns[currentKey]
// I got here through a transitive edge, so increase the in-degree
kahns[currentKey] = kahn
if kahn.inDegree == 1 {
// If I am transitively seeing this node for the first
// time, it is no longer a leaf.
if !alreadySeen {
// If I am seeing this node for the first time, I need to check its
// parents
for _, depVtx := range current.Parents() {
// No need to traverse to a decided vertex
if !depVtx.Status().Decided() {
frontier = append(frontier, depVtx)
return kahns, leaves
// count the number of votes for each operation
func (ta *Topological) pushVotes(
kahnNodes map[[32]byte]kahnNode,
leaves []ids.ID) ids.Bag {
votes := make(ids.UniqueBag)
2020-03-10 13:10:53 -07:00
txConflicts := make(map[[32]byte]ids.Set)
2020-03-10 12:20:34 -07:00
for len(leaves) > 0 {
newLeavesSize := len(leaves) - 1
leaf := leaves[newLeavesSize]
leaves = leaves[:newLeavesSize]
key := leaf.Key()
kahn := kahnNodes[key]
if vtx := ta.nodes[key]; vtx != nil {
for _, tx := range vtx.Txs() {
// Give the votes to the consumer
txID := tx.ID()
votes.UnionSet(txID, kahn.votes)
2020-03-10 13:10:53 -07:00
// Map txID to set of Conflicts
txKey := txID.Key()
if _, exists := txConflicts[txKey]; !exists {
txConflicts[txKey] =
2020-03-10 12:20:34 -07:00
for _, dep := range vtx.Parents() {
depID := dep.ID()
depKey := depID.Key()
if depNode, notPruned := kahnNodes[depKey]; notPruned {
// Give the votes to my parents
kahnNodes[depKey] = depNode
if depNode.inDegree == 0 {
// Only traverse into the leaves
leaves = append(leaves, depID)
2020-03-10 13:10:53 -07:00
// Create bag of votes for conflicting transactions
conflictingVotes := make(ids.UniqueBag)
for txHash, conflicts := range txConflicts {
txID := ids.NewID(txHash)
for conflictTxHash := range conflicts {
conflictTxID := ids.NewID(conflictTxHash)
conflictingVotes.UnionSet(txID, votes.GetSet(conflictTxID))
2020-03-10 12:20:34 -07:00
return votes.Bag(ta.params.Alpha)
// If I've already checked, do nothing
// If I'm decided, cache the preference and return
// At this point, I must be live
// I now try to accept all my consumers
// I now update all my ancestors
// If any of my parents are rejected, reject myself
// If I'm preferred, remove all my ancestors from the preferred frontier, add
// myself to the preferred frontier
// If all my parents are accepted and I'm acceptable, accept myself
func (ta *Topological) update(vtx Vertex) error {
2020-03-10 12:20:34 -07:00
vtxID := vtx.ID()
vtxKey := vtxID.Key()
if _, cached := ta.preferenceCache[vtxKey]; cached {
return nil // This vertex has already been updated
2020-03-10 12:20:34 -07:00
switch vtx.Status() {
case choices.Accepted:
ta.preferred.Add(vtxID) // I'm preferred
ta.virtuous.Add(vtxID) // Accepted is defined as virtuous[vtxKey] = vtx // I have no descendents yet
ta.preferenceCache[vtxKey] = true
ta.virtuousCache[vtxKey] = true
return nil
2020-03-10 12:20:34 -07:00
case choices.Rejected:
// I'm rejected
ta.preferenceCache[vtxKey] = false
ta.virtuousCache[vtxKey] = false
return nil
2020-03-10 12:20:34 -07:00
acceptable := true // If the batch is accepted, this vertex is acceptable
rejectable := false // If I'm rejectable, I must be rejected
preferred := true
virtuous := true
txs := vtx.Txs()
preferences :=
virtuousTxs :=
for _, tx := range txs {
txID := tx.ID()
s := tx.Status()
if s == choices.Rejected {
// If I contain a rejected consumer, I am rejectable
rejectable = true
preferred = false
virtuous = false
if s != choices.Accepted {
// If I contain a non-accepted consumer, I am not acceptable
acceptable = false
preferred = preferred && preferences.Contains(txID)
virtuous = virtuous && virtuousTxs.Contains(txID)
deps := vtx.Parents()
// Update all of my dependencies
for _, dep := range deps {
if err := ta.update(dep); err != nil {
return err
2020-03-10 12:20:34 -07:00
depID := dep.ID()
key := depID.Key()
preferred = preferred && ta.preferenceCache[key]
virtuous = virtuous && ta.virtuousCache[key]
// Check my parent statuses
for _, dep := range deps {
if status := dep.Status(); status == choices.Rejected {
// My parent is rejected, so I should be rejected
if err := vtx.Reject(); err != nil {
return err
ta.ctx.ConsensusDispatcher.Reject(ta.ctx.ChainID, vtxID, vtx.Bytes())
2020-03-10 12:20:34 -07:00
delete(ta.nodes, vtxKey)
2020-03-14 22:09:24 -07:00
2020-03-10 12:20:34 -07:00
ta.preferenceCache[vtxKey] = false
ta.virtuousCache[vtxKey] = false
return nil
2020-03-10 12:20:34 -07:00
} else if status != choices.Accepted {
acceptable = false // My parent isn't accepted, so I can't be
// Technically, we could also check to see if there are direct conflicts
// between this vertex and a vertex in it's ancestry. If there does exist
// such a conflict, this vertex could also be rejected. However, this would
// require a traversal. Therefore, this memory optimization is ignored.
// Also, this will only happen from a byzantine node issuing the vertex.
// Therefore, this is very unlikely to actually be triggered in practice.
// Remove all my parents from the frontier
for _, dep := range deps {
delete(, dep.ID().Key())
}[vtxKey] = vtx // I have no descendents yet
ta.preferenceCache[vtxKey] = preferred
ta.virtuousCache[vtxKey] = virtuous
if preferred {
ta.preferred.Add(vtxID) // I'm preferred
for _, dep := range deps {
ta.preferred.Remove(dep.ID()) // My parents aren't part of the frontier
for _, tx := range txs {
if tx.Status() != choices.Accepted {
if virtuous {
ta.virtuous.Add(vtxID) // I'm virtuous
for _, dep := range deps {
ta.virtuous.Remove(dep.ID()) // My parents aren't part of the frontier
switch {
case acceptable:
// I'm acceptable, why not accept?
if err := vtx.Accept(); err != nil {
return err
2020-03-10 12:20:34 -07:00
ta.ctx.ConsensusDispatcher.Accept(ta.ctx.ChainID, vtxID, vtx.Bytes())
delete(ta.nodes, vtxKey)
2020-03-14 22:09:24 -07:00
2020-03-10 12:20:34 -07:00
case rejectable:
// I'm rejectable, why not reject?
if err := vtx.Reject(); err != nil {
return err
2020-03-10 12:20:34 -07:00
ta.ctx.ConsensusDispatcher.Reject(ta.ctx.ChainID, vtxID, vtx.Bytes())
delete(ta.nodes, vtxKey)
2020-03-14 22:09:24 -07:00
2020-03-10 12:20:34 -07:00
return nil
2020-03-10 12:20:34 -07:00
// Update the frontier sets
func (ta *Topological) updateFrontiers() error {
2020-03-10 12:20:34 -07:00
vts :=
ta.orphans.Clear() = make(map[[32]byte]Vertex)
ta.preferenceCache = make(map[[32]byte]bool)
ta.virtuousCache = make(map[[32]byte]bool)
ta.orphans.Union( // Initially, nothing is preferred
for _, vtx := range vts {
// Update all the vertices that were in my previous frontier
if err := ta.update(vtx); err != nil {
return err
2020-03-10 12:20:34 -07:00
return nil
2020-03-10 12:20:34 -07:00