Added returned errors to Accept and Reject in decidables

This commit is contained in:
StephenButtolph 2020-05-26 13:25:34 -04:00
parent 60668c3a91
commit 537e72714f
24 changed files with 259 additions and 149 deletions

View File

@ -22,12 +22,12 @@ type Decidable interface {
// Accept this element.
//
// This element will be accepted by every correct node in the network.
Accept()
Accept() error
// Reject this element.
//
// This element will not be accepted by any correct node in the network.
Reject()
Reject() error
// Status returns this element's current status.
//

View File

@ -34,8 +34,9 @@ type Consensus interface {
IsVirtuous(snowstorm.Tx) bool
// Adds a new decision. Assumes the dependencies have already been added.
// Assumes that mutations don't conflict with themselves.
Add(Vertex)
// Assumes that mutations don't conflict with themselves. Returns if a
// critical error has occurred.
Add(Vertex) error
// VertexIssued returns true iff Vertex has been added
VertexIssued(Vertex) bool
@ -54,8 +55,9 @@ type Consensus interface {
Preferences() ids.Set
// RecordPoll collects the results of a network poll. If a result has not
// been added, the result is dropped.
RecordPoll(ids.UniqueBag)
// been added, the result is dropped. Returns if a critical error has
// occurred.
RecordPoll(ids.UniqueBag) error
// Quiesce returns true iff all vertices that have been added but not been accepted or rejected are rogue.
// Note, it is possible that after returning quiesce, a new decision may be added such

View File

@ -74,7 +74,7 @@ func (ta *Topological) Initialize(ctx *snow.Context, params Parameters, frontier
for _, vtx := range frontier {
ta.frontier[vtx.ID().Key()] = vtx
}
ta.updateFrontiers()
ctx.Log.AssertNoError(ta.updateFrontiers())
}
// Parameters implements the Avalanche interface
@ -84,15 +84,15 @@ func (ta *Topological) Parameters() Parameters { return ta.params }
func (ta *Topological) IsVirtuous(tx snowstorm.Tx) bool { return ta.cg.IsVirtuous(tx) }
// Add implements the Avalanche interface
func (ta *Topological) Add(vtx Vertex) {
func (ta *Topological) Add(vtx Vertex) error {
ta.ctx.Log.AssertTrue(vtx != nil, "Attempting to insert nil vertex")
vtxID := vtx.ID()
key := vtxID.Key()
if vtx.Status().Decided() {
return // Already decided this vertex
return nil // Already decided this vertex
} else if _, exists := ta.nodes[key]; exists {
return // Already inserted this vertex
return nil // Already inserted this vertex
}
ta.ctx.ConsensusDispatcher.Issue(ta.ctx.ChainID, vtxID, vtx.Bytes())
@ -100,14 +100,16 @@ func (ta *Topological) Add(vtx Vertex) {
for _, tx := range vtx.Txs() {
if !tx.Status().Decided() {
// Add the consumers to the conflict graph.
ta.cg.Add(tx)
if err := ta.cg.Add(tx); err != nil {
return err
}
}
}
ta.nodes[key] = vtx // Add this vertex to the set of nodes
ta.metrics.Issued(vtxID)
ta.update(vtx) // Update the vertex and it's ancestry
return ta.update(vtx) // Update the vertex and it's ancestry
}
// VertexIssued implements the Avalanche interface
@ -132,7 +134,7 @@ func (ta *Topological) Virtuous() ids.Set { return ta.virtuous }
func (ta *Topological) Preferences() ids.Set { return ta.preferred }
// RecordPoll implements the Avalanche interface
func (ta *Topological) RecordPoll(responses ids.UniqueBag) {
func (ta *Topological) RecordPoll(responses ids.UniqueBag) error {
// Set up the topological sort: O(|Live Set|)
kahns, leaves := ta.calculateInDegree(responses)
// Collect the votes for each transaction: O(|Live Set|)
@ -141,7 +143,7 @@ func (ta *Topological) RecordPoll(responses ids.UniqueBag) {
ta.ctx.Log.Verbo("Updating consumer confidences based on:\n%s", &votes)
ta.cg.RecordPoll(votes)
// Update the dag: O(|Live Set|)
ta.updateFrontiers()
return ta.updateFrontiers()
}
// Quiesce implements the Avalanche interface
@ -275,11 +277,11 @@ func (ta *Topological) pushVotes(
// If I'm preferred, remove all my ancestors from the preferred frontier, add
// myself to the preferred frontier
// If all my parents are accepted and I'm acceptable, accept myself
func (ta *Topological) update(vtx Vertex) {
func (ta *Topological) update(vtx Vertex) error {
vtxID := vtx.ID()
vtxKey := vtxID.Key()
if _, cached := ta.preferenceCache[vtxKey]; cached {
return // This vertex has already been updated
return nil // This vertex has already been updated
}
switch vtx.Status() {
@ -291,12 +293,12 @@ func (ta *Topological) update(vtx Vertex) {
ta.preferenceCache[vtxKey] = true
ta.virtuousCache[vtxKey] = true
return
return nil
case choices.Rejected:
// I'm rejected
ta.preferenceCache[vtxKey] = false
ta.virtuousCache[vtxKey] = false
return
return nil
}
acceptable := true // If the batch is accepted, this vertex is acceptable
@ -327,7 +329,9 @@ func (ta *Topological) update(vtx Vertex) {
deps := vtx.Parents()
// Update all of my dependencies
for _, dep := range deps {
ta.update(dep)
if err := ta.update(dep); err != nil {
return err
}
depID := dep.ID()
key := depID.Key()
@ -338,13 +342,17 @@ func (ta *Topological) update(vtx Vertex) {
// Check my parent statuses
for _, dep := range deps {
if status := dep.Status(); status == choices.Rejected {
vtx.Reject() // My parent is rejected, so I should be rejected
// My parent is rejected, so I should be rejected
if err := vtx.Reject(); err != nil {
return err
}
ta.ctx.ConsensusDispatcher.Reject(ta.ctx.ChainID, vtxID, vtx.Bytes())
delete(ta.nodes, vtxKey)
ta.metrics.Rejected(vtxID)
ta.preferenceCache[vtxKey] = false
ta.virtuousCache[vtxKey] = false
return
return nil
} else if status != choices.Accepted {
acceptable = false // My parent isn't accepted, so I can't be
}
@ -389,21 +397,26 @@ func (ta *Topological) update(vtx Vertex) {
switch {
case acceptable:
// I'm acceptable, why not accept?
if err := vtx.Accept(); err != nil {
return err
}
ta.ctx.ConsensusDispatcher.Accept(ta.ctx.ChainID, vtxID, vtx.Bytes())
vtx.Accept()
delete(ta.nodes, vtxKey)
ta.metrics.Accepted(vtxID)
case rejectable:
// I'm rejectable, why not reject?
vtx.Reject()
if err := vtx.Reject(); err != nil {
return err
}
ta.ctx.ConsensusDispatcher.Reject(ta.ctx.ChainID, vtxID, vtx.Bytes())
delete(ta.nodes, vtxKey)
ta.metrics.Rejected(vtxID)
}
return nil
}
// Update the frontier sets
func (ta *Topological) updateFrontiers() {
func (ta *Topological) updateFrontiers() error {
vts := ta.frontier
ta.preferred.Clear()
@ -417,6 +430,9 @@ func (ta *Topological) updateFrontiers() {
for _, vtx := range vts {
// Update all the vertices that were in my previous frontier
ta.update(vtx)
if err := ta.update(vtx); err != nil {
return err
}
}
return nil
}

View File

@ -28,8 +28,8 @@ func (v *Vtx) Parents() []Vertex { return v.dependencies }
func (v *Vtx) Txs() []snowstorm.Tx { return v.txs }
func (v *Vtx) Status() choices.Status { return v.status }
func (v *Vtx) Live() {}
func (v *Vtx) Accept() { v.status = choices.Accepted }
func (v *Vtx) Reject() { v.status = choices.Rejected }
func (v *Vtx) Accept() error { v.status = choices.Accepted; return nil }
func (v *Vtx) Reject() error { v.status = choices.Rejected; return nil }
func (v *Vtx) Bytes() []byte { return v.bytes }
type sortVts []*Vtx

View File

@ -4,6 +4,7 @@
package snowman
import (
"errors"
"sort"
"github.com/ava-labs/gecko/ids"
@ -21,17 +22,19 @@ type TestBlock struct {
func (b *TestBlock) Parent() Block { return b.parent }
func (b *TestBlock) ID() ids.ID { return b.id }
func (b *TestBlock) Status() choices.Status { return b.status }
func (b *TestBlock) Accept() {
func (b *TestBlock) Accept() error {
if b.status.Decided() && b.status != choices.Accepted {
panic("Dis-agreement")
return errors.New("Dis-agreement")
}
b.status = choices.Accepted
return nil
}
func (b *TestBlock) Reject() {
func (b *TestBlock) Reject() error {
if b.status.Decided() && b.status != choices.Rejected {
panic("Dis-agreement")
return errors.New("Dis-agreement")
}
b.status = choices.Rejected
return nil
}
func (b *TestBlock) Verify() error { return nil }
func (b *TestBlock) Bytes() []byte { return b.bytes }

View File

@ -19,7 +19,8 @@ type Consensus interface {
Parameters() snowball.Parameters
// Adds a new decision. Assumes the dependency has already been added.
Add(Block)
// Returns if a critical error has occurred.
Add(Block) error
// Issued returns true if the block has been issued into consensus
Issued(Block) bool
@ -29,8 +30,8 @@ type Consensus interface {
Preference() ids.ID
// RecordPoll collects the results of a network poll. Assumes all decisions
// have been previously added.
RecordPoll(ids.Bag)
// have been previously added. Returns if a critical error has occurred.
RecordPoll(ids.Bag) error
// Finalized returns true if all decisions that have been added have been
// finalized. Note, it is possible that after returning finalized, a new

View File

@ -77,7 +77,7 @@ func (ts *Topological) Initialize(ctx *snow.Context, params snowball.Parameters,
func (ts *Topological) Parameters() snowball.Parameters { return ts.params }
// Add implements the Snowman interface
func (ts *Topological) Add(blk Block) {
func (ts *Topological) Add(blk Block) error {
parent := blk.Parent()
parentID := parent.ID()
parentKey := parentID.Key()
@ -95,13 +95,15 @@ func (ts *Topological) Add(blk Block) {
// If the ancestor is missing, this means the ancestor must have already
// been pruned. Therefore, the dependent should be transitively
// rejected.
blk.Reject()
if err := blk.Reject(); err != nil {
return err
}
// Notify anyone listening that this block was rejected.
ts.ctx.DecisionDispatcher.Reject(ts.ctx.ChainID, blkID, blkBytes)
ts.ctx.ConsensusDispatcher.Reject(ts.ctx.ChainID, blkID, blkBytes)
ts.metrics.Rejected(blkID)
return
return nil
}
// add the block as a child of its parent, and add the block to the tree
@ -115,6 +117,7 @@ func (ts *Topological) Add(blk Block) {
if ts.tail.Equals(parentID) {
ts.tail = blkID
}
return nil
}
// Issued implements the Snowman interface
@ -154,7 +157,7 @@ func (ts *Topological) Preference() ids.ID { return ts.tail }
// The complexity of this function is:
// - Runtime = 3 * |live set| + |votes|
// - Space = 2 * |live set| + |votes|
func (ts *Topological) RecordPoll(votes ids.Bag) {
func (ts *Topological) RecordPoll(votes ids.Bag) error {
// Runtime = |live set| + |votes| ; Space = |live set| + |votes|
kahnGraph, leaves := ts.calculateInDegree(votes)
@ -162,10 +165,14 @@ func (ts *Topological) RecordPoll(votes ids.Bag) {
voteStack := ts.pushVotes(kahnGraph, leaves)
// Runtime = |live set| ; Space = Constant
preferred := ts.vote(voteStack)
preferred, err := ts.vote(voteStack)
if err != nil {
return err
}
// Runtime = |live set| ; Space = Constant
ts.tail = ts.getPreferredDecendent(preferred)
return nil
}
// Finalized implements the Snowman interface
@ -292,7 +299,7 @@ func (ts *Topological) pushVotes(
}
// apply votes to the branch that received an Alpha threshold
func (ts *Topological) vote(voteStack []votes) ids.ID {
func (ts *Topological) vote(voteStack []votes) (ids.ID, error) {
// If the voteStack is empty, then the full tree should falter. This won't
// change the preferred branch.
if len(voteStack) == 0 {
@ -301,7 +308,7 @@ func (ts *Topological) vote(voteStack []votes) ids.ID {
headKey := ts.head.Key()
headBlock := ts.blocks[headKey]
headBlock.shouldFalter = true
return ts.tail
return ts.tail, nil
}
// keep track of the new preferred block
@ -341,7 +348,9 @@ func (ts *Topological) vote(voteStack []votes) ids.ID {
// Only accept when you are finalized and the head.
if parentBlock.sb.Finalized() && ts.head.Equals(vote.parentID) {
ts.accept(parentBlock)
if err := ts.accept(parentBlock); err != nil {
return ids.ID{}, err
}
// by accepting the child of parentBlock, the last accepted block is
// no longer voteParentID, but its child. So, voteParentID can be
@ -393,7 +402,7 @@ func (ts *Topological) vote(voteStack []votes) ids.ID {
}
}
}
return newPreferred
return newPreferred, nil
}
// Get the preferred decendent of the provided block ID
@ -409,7 +418,7 @@ func (ts *Topological) getPreferredDecendent(blkID ids.ID) ids.ID {
// accept the preferred child of the provided snowman block. By accepting the
// preferred child, all other children will be rejected. When these children are
// rejected, all their descendants will be rejected.
func (ts *Topological) accept(n *snowmanBlock) {
func (ts *Topological) accept(n *snowmanBlock) error {
// We are finalizing the block's child, so we need to get the preference
pref := n.sb.Preference()
@ -417,7 +426,9 @@ func (ts *Topological) accept(n *snowmanBlock) {
// Get the child and accept it
child := n.children[pref.Key()]
child.Accept()
if err := child.Accept(); err != nil {
return err
}
// Notify anyone listening that this block was accepted.
bytes := child.Bytes()
@ -439,7 +450,9 @@ func (ts *Topological) accept(n *snowmanBlock) {
continue
}
child.Reject()
if err := child.Reject(); err != nil {
return err
}
// Notify anyone listening that this block was rejected.
bytes := child.Bytes()
@ -452,11 +465,11 @@ func (ts *Topological) accept(n *snowmanBlock) {
}
// reject all the descendants of the blocks we just rejected
ts.rejectTransitively(rejects)
return ts.rejectTransitively(rejects)
}
// Takes in a list of rejected ids and rejects all descendants of these IDs
func (ts *Topological) rejectTransitively(rejected []ids.ID) {
func (ts *Topological) rejectTransitively(rejected []ids.ID) error {
// the rejected array is treated as a queue, with the next element at index
// 0 and the last element at the end of the slice.
for len(rejected) > 0 {
@ -471,7 +484,9 @@ func (ts *Topological) rejectTransitively(rejected []ids.ID) {
delete(ts.blocks, rejectedKey)
for childIDKey, child := range rejectedNode.children {
child.Reject()
if err := child.Reject(); err != nil {
return err
}
// Notify anyone listening that this block was rejected.
childID := ids.NewID(childIDKey)
@ -484,4 +499,5 @@ func (ts *Topological) rejectTransitively(rejected []ids.ID) {
rejected = append(rejected, childID)
}
}
return nil
}

View File

@ -28,8 +28,9 @@ type Consensus interface {
// That is, no transaction has been added that conflicts with <Tx>
IsVirtuous(Tx) bool
// Adds a new transaction to vote on
Add(Tx)
// Adds a new transaction to vote on. Returns if a critical error has
// occurred.
Add(Tx) error
// Returns true iff transaction <Tx> has been added
Issued(Tx) bool
@ -45,8 +46,8 @@ type Consensus interface {
Conflicts(Tx) ids.Set
// Collects the results of a network poll. Assumes all transactions
// have been previously added
RecordPoll(ids.Bag)
// have been previously added. Returns if a critical error has occurred.
RecordPoll(ids.Bag) error
// Returns true iff all remaining transactions are rogue. Note, it is
// possible that after returning quiesce, a new decision may be added such

View File

@ -14,6 +14,7 @@ import (
"github.com/ava-labs/gecko/snow/consensus/snowball"
"github.com/ava-labs/gecko/snow/events"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/wrappers"
)
// DirectedFactory implements Factory by returning a directed struct
@ -54,6 +55,8 @@ type Directed struct {
// Number of times RecordPoll has been called
currentVote int
errs wrappers.Errs
}
type flatNode struct {
@ -118,9 +121,9 @@ func (dg *Directed) Conflicts(tx Tx) ids.Set {
}
// Add implements the Consensus interface
func (dg *Directed) Add(tx Tx) {
func (dg *Directed) Add(tx Tx) error {
if dg.Issued(tx) {
return // Already inserted
return nil // Already inserted
}
txID := tx.ID()
@ -130,11 +133,13 @@ func (dg *Directed) Add(tx Tx) {
inputs := tx.InputIDs()
// If there are no inputs, Tx is vacuously accepted
if inputs.Len() == 0 {
tx.Accept()
if err := tx.Accept(); err != nil {
return err
}
dg.ctx.DecisionDispatcher.Accept(dg.ctx.ChainID, txID, bytes)
dg.metrics.Issued(txID)
dg.metrics.Accepted(txID)
return
return nil
}
fn := &flatNode{tx: tx}
@ -195,6 +200,7 @@ func (dg *Directed) Add(tx Tx) {
}
}
dg.pendingReject.Register(toReject)
return dg.errs.Err
}
// Issued implements the Consensus interface
@ -213,7 +219,7 @@ func (dg *Directed) Virtuous() ids.Set { return dg.virtuous }
func (dg *Directed) Preferences() ids.Set { return dg.preferences }
// RecordPoll implements the Consensus interface
func (dg *Directed) RecordPoll(votes ids.Bag) {
func (dg *Directed) RecordPoll(votes ids.Bag) error {
dg.currentVote++
votes.SetThreshold(dg.params.Alpha)
@ -231,7 +237,8 @@ func (dg *Directed) RecordPoll(votes ids.Bag) {
}
fn.lastVote = dg.currentVote
dg.ctx.Log.Verbo("Increasing (bias, confidence) of %s from (%d, %d) to (%d, %d)", toInc, fn.bias, fn.confidence, fn.bias+1, fn.confidence+1)
dg.ctx.Log.Verbo("Increasing (bias, confidence) of %s from (%d, %d) to (%d, %d)",
toInc, fn.bias, fn.confidence, fn.bias+1, fn.confidence+1)
fn.bias++
fn.confidence++
@ -240,17 +247,22 @@ func (dg *Directed) RecordPoll(votes ids.Bag) {
((!fn.rogue && fn.confidence >= dg.params.BetaVirtuous) ||
fn.confidence >= dg.params.BetaRogue) {
dg.deferAcceptance(fn)
if dg.errs.Errored() {
return dg.errs.Err
}
}
if !fn.accepted {
dg.redirectEdges(fn)
}
}
return dg.errs.Err
}
// Quiesce implements the Consensus interface
func (dg *Directed) Quiesce() bool {
numVirtuous := dg.virtuousVoting.Len()
dg.ctx.Log.Verbo("Conflict graph has %d voting virtuous transactions and %d transactions", numVirtuous, len(dg.nodes))
dg.ctx.Log.Verbo("Conflict graph has %d voting virtuous transactions and %d transactions",
numVirtuous, len(dg.nodes))
return numVirtuous == 0
}
@ -311,7 +323,7 @@ func (dg *Directed) deferAcceptance(fn *flatNode) {
dg.pendingAccept.Register(toAccept)
}
func (dg *Directed) reject(ids ...ids.ID) {
func (dg *Directed) reject(ids ...ids.ID) error {
for _, conflict := range ids {
conflictKey := conflict.Key()
conf := dg.nodes[conflictKey]
@ -324,13 +336,16 @@ func (dg *Directed) reject(ids ...ids.ID) {
dg.removeConflict(conflict, conf.outs.List()...)
// Mark it as rejected
conf.tx.Reject()
if err := conf.tx.Reject(); err != nil {
return err
}
dg.ctx.DecisionDispatcher.Reject(dg.ctx.ChainID, conf.tx.ID(), conf.tx.Bytes())
dg.metrics.Rejected(conflict)
dg.pendingAccept.Abandon(conflict)
dg.pendingReject.Fulfill(conflict)
}
return nil
}
func (dg *Directed) redirectEdges(fn *flatNode) {
@ -396,7 +411,7 @@ func (a *directedAccepter) Abandon(id ids.ID) { a.rejected = true }
func (a *directedAccepter) Update() {
// If I was rejected or I am still waiting on dependencies to finish do nothing.
if a.rejected || a.deps.Len() != 0 {
if a.rejected || a.deps.Len() != 0 || a.dg.errs.Errored() {
return
}
@ -410,12 +425,22 @@ func (a *directedAccepter) Update() {
a.dg.preferences.Remove(id)
// Reject the conflicts
a.dg.reject(a.fn.ins.List()...)
a.dg.reject(a.fn.outs.List()...) // Should normally be empty
if err := a.dg.reject(a.fn.ins.List()...); err != nil {
a.dg.errs.Add(err)
return
}
// Should normally be empty
if err := a.dg.reject(a.fn.outs.List()...); err != nil {
a.dg.errs.Add(err)
return
}
// Mark it as accepted
if err := a.fn.tx.Accept(); err != nil {
a.dg.errs.Add(err)
return
}
a.fn.accepted = true
a.fn.tx.Accept()
a.dg.ctx.DecisionDispatcher.Accept(a.dg.ctx.ChainID, id, a.fn.tx.Bytes())
a.dg.metrics.Accepted(id)
@ -434,11 +459,11 @@ type directedRejector struct {
func (r *directedRejector) Dependencies() ids.Set { return r.deps }
func (r *directedRejector) Fulfill(id ids.ID) {
if r.rejected {
if r.rejected || r.dg.errs.Errored() {
return
}
r.rejected = true
r.dg.reject(r.fn.tx.ID())
r.dg.errs.Add(r.dg.reject(r.fn.tx.ID()))
}
func (*directedRejector) Abandon(id ids.ID) {}

View File

@ -14,6 +14,7 @@ import (
"github.com/ava-labs/gecko/snow/consensus/snowball"
"github.com/ava-labs/gecko/snow/events"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/wrappers"
)
// InputFactory implements Factory by returning an input struct
@ -43,6 +44,8 @@ type Input struct {
// Number of times RecordPoll has been called
currentVote int
errs wrappers.Errs
}
type txNode struct {
@ -92,9 +95,9 @@ func (ig *Input) IsVirtuous(tx Tx) bool {
}
// Add implements the ConflictGraph interface
func (ig *Input) Add(tx Tx) {
func (ig *Input) Add(tx Tx) error {
if ig.Issued(tx) {
return // Already inserted
return nil // Already inserted
}
txID := tx.ID()
@ -104,11 +107,13 @@ func (ig *Input) Add(tx Tx) {
inputs := tx.InputIDs()
// If there are no inputs, they are vacuously accepted
if inputs.Len() == 0 {
tx.Accept()
if err := tx.Accept(); err != nil {
return err
}
ig.ctx.DecisionDispatcher.Accept(ig.ctx.ChainID, txID, bytes)
ig.metrics.Issued(txID)
ig.metrics.Accepted(txID)
return
return nil
}
cn := txNode{tx: tx}
@ -155,6 +160,7 @@ func (ig *Input) Add(tx Tx) {
}
}
ig.pendingReject.Register(toReject)
return ig.errs.Err
}
// Issued implements the ConflictGraph interface
@ -187,7 +193,7 @@ func (ig *Input) Conflicts(tx Tx) ids.Set {
}
// RecordPoll implements the ConflictGraph interface
func (ig *Input) RecordPoll(votes ids.Bag) {
func (ig *Input) RecordPoll(votes ids.Bag) error {
ig.currentVote++
votes.SetThreshold(ig.params.Alpha)
@ -261,11 +267,15 @@ func (ig *Input) RecordPoll(votes ids.Bag) {
if (!rogue && confidence >= ig.params.BetaVirtuous) ||
confidence >= ig.params.BetaRogue {
ig.deferAcceptance(tx)
if ig.errs.Errored() {
return ig.errs.Err
}
continue
}
ig.txs[incKey] = tx
}
return ig.errs.Err
}
func (ig *Input) deferAcceptance(tn txNode) {
@ -285,7 +295,7 @@ func (ig *Input) deferAcceptance(tn txNode) {
}
// reject all the ids and remove them from their conflict sets
func (ig *Input) reject(ids ...ids.ID) {
func (ig *Input) reject(ids ...ids.ID) error {
for _, conflict := range ids {
conflictKey := conflict.Key()
cn := ig.txs[conflictKey]
@ -296,12 +306,15 @@ func (ig *Input) reject(ids ...ids.ID) {
ig.removeConflict(conflict, cn.tx.InputIDs().List()...)
// Mark it as rejected
cn.tx.Reject()
if err := cn.tx.Reject(); err != nil {
return err
}
ig.ctx.DecisionDispatcher.Reject(ig.ctx.ChainID, cn.tx.ID(), cn.tx.Bytes())
ig.metrics.Rejected(conflict)
ig.pendingAccept.Abandon(conflict)
ig.pendingReject.Fulfill(conflict)
}
return nil
}
// Remove id from all of its conflict sets
@ -458,7 +471,7 @@ func (a *inputAccepter) Fulfill(id ids.ID) {
func (a *inputAccepter) Abandon(id ids.ID) { a.rejected = true }
func (a *inputAccepter) Update() {
if a.rejected || a.deps.Len() != 0 {
if a.rejected || a.deps.Len() != 0 || a.ig.errs.Errored() {
return
}
@ -480,10 +493,16 @@ func (a *inputAccepter) Update() {
conflicts.Union(inputNode.conflicts)
}
}
a.ig.reject(conflicts.List()...)
if err := a.ig.reject(conflicts.List()...); err != nil {
a.ig.errs.Add(err)
return
}
// Mark it as accepted
a.tn.tx.Accept()
if err := a.tn.tx.Accept(); err != nil {
a.ig.errs.Add(err)
return
}
a.ig.ctx.DecisionDispatcher.Accept(a.ig.ctx.ChainID, id, a.tn.tx.Bytes())
a.ig.metrics.Accepted(id)
@ -502,11 +521,11 @@ type inputRejector struct {
func (r *inputRejector) Dependencies() ids.Set { return r.deps }
func (r *inputRejector) Fulfill(id ids.ID) {
if r.rejected {
if r.rejected || r.ig.errs.Errored() {
return
}
r.rejected = true
r.ig.reject(r.tn.tx.ID())
r.ig.errs.Add(r.ig.reject(r.tn.tx.ID()))
}
func (*inputRejector) Abandon(id ids.ID) {}

View File

@ -31,10 +31,10 @@ func (tx *TestTx) InputIDs() ids.Set { return tx.Ins }
func (tx *TestTx) Status() choices.Status { return tx.Stat }
// Accept implements the Consumer interface
func (tx *TestTx) Accept() { tx.Stat = choices.Accepted }
func (tx *TestTx) Accept() error { tx.Stat = choices.Accepted; return nil }
// Reject implements the Consumer interface
func (tx *TestTx) Reject() { tx.Stat = choices.Rejected }
func (tx *TestTx) Reject() error { tx.Stat = choices.Rejected; return nil }
// Reset sets the status to pending
func (tx *TestTx) Reset() { tx.Stat = choices.Processing }

View File

@ -38,8 +38,8 @@ func (v *Vtx) DependencyIDs() []ids.ID { return nil }
func (v *Vtx) Parents() []avalanche.Vertex { return v.parents }
func (v *Vtx) Txs() []snowstorm.Tx { return v.txs }
func (v *Vtx) Status() choices.Status { return v.status }
func (v *Vtx) Accept() { v.status = choices.Accepted }
func (v *Vtx) Reject() { v.status = choices.Rejected }
func (v *Vtx) Accept() error { v.status = choices.Accepted; return nil }
func (v *Vtx) Reject() error { v.status = choices.Rejected; return nil }
func (v *Vtx) Bytes() []byte { return v.bytes }
type sortVts []*Vtx

View File

@ -76,7 +76,7 @@ func (vtx *uniqueVertex) setStatus(status choices.Status) {
func (vtx *uniqueVertex) ID() ids.ID { return vtx.vtxID }
func (vtx *uniqueVertex) Accept() {
func (vtx *uniqueVertex) Accept() error {
vtx.setStatus(choices.Accepted)
vtx.serializer.edge.Add(vtx.vtxID)
@ -90,17 +90,17 @@ func (vtx *uniqueVertex) Accept() {
// parents to be garbage collected
vtx.v.parents = nil
vtx.serializer.db.Commit()
return vtx.serializer.db.Commit()
}
func (vtx *uniqueVertex) Reject() {
func (vtx *uniqueVertex) Reject() error {
vtx.setStatus(choices.Rejected)
// Should never traverse into parents of a decided vertex. Allows for the
// parents to be garbage collected
vtx.v.parents = nil
vtx.serializer.db.Commit()
return vtx.serializer.db.Commit()
}
func (vtx *uniqueVertex) Status() choices.Status { vtx.refresh(); return vtx.v.status }

View File

@ -34,8 +34,8 @@ type Blk struct {
func (b *Blk) ID() ids.ID { return b.id }
func (b *Blk) Parent() snowman.Block { return b.parent }
func (b *Blk) Accept() { b.status = choices.Accepted }
func (b *Blk) Reject() { b.status = choices.Rejected }
func (b *Blk) Accept() error { b.status = choices.Accepted; return nil }
func (b *Blk) Reject() error { b.status = choices.Rejected; return nil }
func (b *Blk) Status() choices.Status { return b.status }
func (b *Blk) Verify() error { return b.validity }
func (b *Blk) Bytes() []byte { return b.bytes }

View File

@ -99,12 +99,12 @@ func (tx *UniqueTx) setStatus(status choices.Status) error {
func (tx *UniqueTx) ID() ids.ID { return tx.txID }
// Accept is called when the transaction was finalized as accepted by consensus
func (tx *UniqueTx) Accept() {
func (tx *UniqueTx) Accept() error {
defer tx.vm.db.Abort()
if err := tx.setStatus(choices.Accepted); err != nil {
tx.vm.ctx.Log.Error("Failed to accept tx %s due to %s", tx.txID, err)
return
return err
}
// Remove spent utxos
@ -116,7 +116,7 @@ func (tx *UniqueTx) Accept() {
utxoID := utxo.InputID()
if err := tx.vm.state.SpendUTXO(utxoID); err != nil {
tx.vm.ctx.Log.Error("Failed to spend utxo %s due to %s", utxoID, err)
return
return err
}
}
@ -124,7 +124,7 @@ func (tx *UniqueTx) Accept() {
for _, utxo := range tx.UTXOs() {
if err := tx.vm.state.FundUTXO(utxo); err != nil {
tx.vm.ctx.Log.Error("Failed to fund utxo %s due to %s", utxoID, err)
return
return err
}
}
@ -132,12 +132,12 @@ func (tx *UniqueTx) Accept() {
commitBatch, err := tx.vm.db.CommitBatch()
if err != nil {
tx.vm.ctx.Log.Error("Failed to calculate CommitBatch for %s due to %s", txID, err)
return
return err
}
if err := tx.ExecuteWithSideEffects(tx.vm, commitBatch); err != nil {
tx.vm.ctx.Log.Error("Failed to commit accept %s due to %s", txID, err)
return
return err
}
tx.vm.ctx.Log.Verbo("Accepted Tx: %s", txID)
@ -149,15 +149,17 @@ func (tx *UniqueTx) Accept() {
if tx.onDecide != nil {
tx.onDecide(choices.Accepted)
}
return nil
}
// Reject is called when the transaction was finalized as rejected by consensus
func (tx *UniqueTx) Reject() {
func (tx *UniqueTx) Reject() error {
defer tx.vm.db.Abort()
if err := tx.setStatus(choices.Rejected); err != nil {
tx.vm.ctx.Log.Error("Failed to reject tx %s due to %s", tx.txID, err)
return
return err
}
txID := tx.ID()
@ -165,6 +167,7 @@ func (tx *UniqueTx) Reject() {
if err := tx.vm.db.Commit(); err != nil {
tx.vm.ctx.Log.Error("Failed to commit reject %s due to %s", tx.txID, err)
return err
}
tx.vm.pubsub.Publish("rejected", txID)
@ -174,6 +177,8 @@ func (tx *UniqueTx) Reject() {
if tx.onDecide != nil {
tx.onDecide(choices.Rejected)
}
return nil
}
// Status returns the current status of this transaction

View File

@ -53,18 +53,28 @@ func (b *Block) Parent() snowman.Block {
// Accept sets this block's status to Accepted and sets lastAccepted to this
// block's ID and saves this info to b.vm.DB
// Recall that b.vm.DB.Commit() must be called to persist to the DB
func (b *Block) Accept() {
b.SetStatus(choices.Accepted) // Change state of this block
b.VM.State.PutStatus(b.VM.DB, b.ID(), choices.Accepted) // Persist data
b.VM.State.PutLastAccepted(b.VM.DB, b.ID())
b.VM.LastAcceptedID = b.ID() // Change state of VM
func (b *Block) Accept() error {
b.SetStatus(choices.Accepted) // Change state of this block
blkID := b.ID()
// Persist data
if err := b.VM.State.PutStatus(b.VM.DB, blkID, choices.Accepted); err != nil {
return err
}
if err := b.VM.State.PutLastAccepted(b.VM.DB, blkID); err != nil {
return err
}
b.VM.LastAcceptedID = blkID // Change state of VM
return nil
}
// Reject sets this block's status to Rejected and saves the status in state
// Recall that b.vm.DB.Commit() must be called to persist to the DB
func (b *Block) Reject() {
func (b *Block) Reject() error {
b.SetStatus(choices.Rejected)
b.VM.State.PutStatus(b.VM.DB, b.ID(), choices.Rejected)
return b.VM.State.PutStatus(b.VM.DB, b.ID(), choices.Rejected)
}
// Status returns the status of this block

View File

@ -22,10 +22,10 @@ type Block struct{ BlkID ids.ID }
func (mb *Block) ID() ids.ID { return mb.BlkID }
// Accept ...
func (*Block) Accept() { panic(errMissingBlock) }
func (*Block) Accept() error { return errMissingBlock }
// Reject ...
func (*Block) Reject() { panic(errMissingBlock) }
func (*Block) Reject() error { return errMissingBlock }
// Status ...
func (*Block) Status() choices.Status { return choices.Unknown }

View File

@ -24,22 +24,9 @@ func TestMissingBlock(t *testing.T) {
t.Fatalf("missingBlock.Verify returned nil, expected an error")
} else if bytes := mb.Bytes(); bytes != nil {
t.Fatalf("missingBlock.Bytes returned %v, expected %v", bytes, nil)
} else if err := mb.Accept(); err == nil {
t.Fatalf("missingBlock.Accept should have returned an error")
} else if err := mb.Reject(); err == nil {
t.Fatalf("missingBlock.Reject should have returned an error")
}
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("Should have panicked on accept")
}
}()
mb.Accept()
}()
func() {
defer func() {
if r := recover(); r == nil {
t.Fatalf("Should have panicked on reject")
}
}()
mb.Reject()
}()
}

View File

@ -96,24 +96,29 @@ func (ab *AtomicBlock) Verify() error {
}
// Accept implements the snowman.Block interface
func (ab *AtomicBlock) Accept() {
func (ab *AtomicBlock) Accept() error {
ab.vm.Ctx.Log.Verbo("Accepting block with ID %s", ab.ID())
ab.CommonBlock.Accept()
if err := ab.CommonBlock.Accept(); err != nil {
return err
}
// Update the state of the chain in the database
if err := ab.onAcceptDB.Commit(); err != nil {
ab.vm.Ctx.Log.Error("unable to commit onAcceptDB")
return err
}
batch, err := ab.vm.DB.CommitBatch()
if err != nil {
ab.vm.Ctx.Log.Fatal("unable to commit vm's DB")
return err
}
defer ab.vm.DB.Abort()
if err := ab.Tx.Accept(batch); err != nil {
ab.vm.Ctx.Log.Error("unable to atomically commit block")
return err
}
for _, child := range ab.children {
@ -124,6 +129,7 @@ func (ab *AtomicBlock) Accept() {
}
ab.free()
return nil
}
// newAtomicBlock returns a new *AtomicBlock where the block's parent, a

View File

@ -129,10 +129,10 @@ type CommonBlock struct {
}
// Reject implements the snowman.Block interface
func (cb *CommonBlock) Reject() {
func (cb *CommonBlock) Reject() error {
defer cb.free() // remove this block from memory
cb.Block.Reject()
return cb.Block.Reject()
}
// free removes this block from memory
@ -213,17 +213,21 @@ type SingleDecisionBlock struct {
}
// Accept implements the snowman.Block interface
func (sdb *SingleDecisionBlock) Accept() {
func (sdb *SingleDecisionBlock) Accept() error {
sdb.VM.Ctx.Log.Verbo("Accepting block with ID %s", sdb.ID())
sdb.CommonBlock.Accept()
if err := sdb.CommonBlock.Accept(); err != nil {
return err
}
// Update the state of the chain in the database
if err := sdb.onAcceptDB.Commit(); err != nil {
sdb.vm.Ctx.Log.Warn("unable to commit onAcceptDB")
return err
}
if err := sdb.vm.DB.Commit(); err != nil {
sdb.vm.Ctx.Log.Warn("unable to commit vm's DB")
return err
}
for _, child := range sdb.children {
@ -234,6 +238,7 @@ func (sdb *SingleDecisionBlock) Accept() {
}
sdb.free()
return nil
}
// DoubleDecisionBlock contains the accept for a pair of blocks
@ -242,25 +247,31 @@ type DoubleDecisionBlock struct {
}
// Accept implements the snowman.Block interface
func (ddb *DoubleDecisionBlock) Accept() {
func (ddb *DoubleDecisionBlock) Accept() error {
ddb.VM.Ctx.Log.Verbo("Accepting block with ID %s", ddb.ID())
parent, ok := ddb.parentBlock().(*ProposalBlock)
if !ok {
ddb.vm.Ctx.Log.Error("double decision block should only follow a proposal block")
return
return errInvalidBlockType
}
parent.CommonBlock.Accept()
if err := parent.CommonBlock.Accept(); err != nil {
return err
}
ddb.CommonBlock.Accept()
if err := ddb.CommonBlock.Accept(); err != nil {
return err
}
// Update the state of the chain in the database
if err := ddb.onAcceptDB.Commit(); err != nil {
ddb.vm.Ctx.Log.Warn("unable to commit onAcceptDB")
return err
}
if err := ddb.vm.DB.Commit(); err != nil {
ddb.vm.Ctx.Log.Warn("unable to commit vm's DB")
return err
}
for _, child := range ddb.children {
@ -273,4 +284,5 @@ func (ddb *DoubleDecisionBlock) Accept() {
// remove this block and its parent from memory
parent.free()
ddb.free()
return nil
}

View File

@ -43,9 +43,10 @@ type ProposalBlock struct {
}
// Accept implements the snowman.Block interface
func (pb *ProposalBlock) Accept() {
func (pb *ProposalBlock) Accept() error {
pb.SetStatus(choices.Accepted)
pb.VM.LastAcceptedID = pb.ID()
return nil
}
// Initialize this block.

View File

@ -289,23 +289,23 @@ type BlockClient struct {
func (b *BlockClient) ID() ids.ID { return b.id }
// Accept ...
func (b *BlockClient) Accept() {
func (b *BlockClient) Accept() error {
delete(b.vm.blks, b.id.Key())
b.status = choices.Accepted
_, err := b.vm.client.BlockAccept(context.Background(), &vmproto.BlockAcceptRequest{
Id: b.id.Bytes(),
})
b.vm.ctx.Log.AssertNoError(err)
return err
}
// Reject ...
func (b *BlockClient) Reject() {
func (b *BlockClient) Reject() error {
delete(b.vm.blks, b.id.Key())
b.status = choices.Rejected
_, err := b.vm.client.BlockReject(context.Background(), &vmproto.BlockRejectRequest{
Id: b.id.Bytes(),
})
b.vm.ctx.Log.AssertNoError(err)
return err
}
// Status ...

View File

@ -46,7 +46,7 @@ type LiveBlock struct {
func (lb *LiveBlock) ID() ids.ID { return lb.block.id }
// Accept is called when this block is finalized as accepted by consensus
func (lb *LiveBlock) Accept() {
func (lb *LiveBlock) Accept() error {
bID := lb.ID()
lb.vm.ctx.Log.Debug("Accepted block %s", bID)
@ -55,7 +55,7 @@ func (lb *LiveBlock) Accept() {
if err := lb.db.Commit(); err != nil {
lb.vm.ctx.Log.Debug("Failed to accept block %s due to %s", bID, err)
return
return err
}
for _, child := range lb.children {
@ -74,15 +74,16 @@ func (lb *LiveBlock) Accept() {
if lb.vm.onAccept != nil {
lb.vm.onAccept(bID)
}
return nil
}
// Reject is called when this block is finalized as rejected by consensus
func (lb *LiveBlock) Reject() {
func (lb *LiveBlock) Reject() error {
lb.vm.ctx.Log.Debug("Rejected block %s", lb.ID())
if err := lb.vm.state.SetStatus(lb.vm.baseDB, lb.ID(), choices.Rejected); err != nil {
lb.vm.ctx.Log.Debug("Failed to reject block %s due to %s", lb.ID(), err)
return
return err
}
lb.status = choices.Rejected
@ -96,6 +97,7 @@ func (lb *LiveBlock) Reject() {
tx.onDecide(choices.Rejected)
}
}
return nil
}
// Status returns the current status of this block

View File

@ -80,17 +80,17 @@ func (tx *UniqueTx) addEvents(finalized func(choices.Status)) {
func (tx *UniqueTx) ID() ids.ID { return tx.txID }
// Accept is called when the transaction was finalized as accepted by consensus
func (tx *UniqueTx) Accept() {
func (tx *UniqueTx) Accept() error {
if err := tx.setStatus(choices.Accepted); err != nil {
tx.vm.ctx.Log.Error("Failed to accept tx %s due to %s", tx.txID, err)
return
return err
}
// Remove spent UTXOs
for _, utxoID := range tx.InputIDs().List() {
if err := tx.vm.state.SpendUTXO(utxoID); err != nil {
tx.vm.ctx.Log.Error("Failed to spend utxo %s due to %s", utxoID, err)
return
return err
}
}
@ -98,7 +98,7 @@ func (tx *UniqueTx) Accept() {
for _, utxo := range tx.utxos() {
if err := tx.vm.state.FundUTXO(utxo); err != nil {
tx.vm.ctx.Log.Error("Failed to fund utxo %s due to %s", utxoID, err)
return
return err
}
}
@ -110,16 +110,18 @@ func (tx *UniqueTx) Accept() {
if err := tx.vm.db.Commit(); err != nil {
tx.vm.ctx.Log.Error("Failed to commit accept %s due to %s", tx.txID, err)
return err
}
tx.t.deps = nil // Needed to prevent a memory leak
return nil
}
// Reject is called when the transaction was finalized as rejected by consensus
func (tx *UniqueTx) Reject() {
func (tx *UniqueTx) Reject() error {
if err := tx.setStatus(choices.Rejected); err != nil {
tx.vm.ctx.Log.Error("Failed to reject tx %s due to %s", tx.txID, err)
return
return err
}
tx.vm.ctx.Log.Debug("Rejecting Tx: %s", tx.ID())
@ -132,9 +134,11 @@ func (tx *UniqueTx) Reject() {
if err := tx.vm.db.Commit(); err != nil {
tx.vm.ctx.Log.Error("Failed to commit reject %s due to %s", tx.txID, err)
return err
}
tx.t.deps = nil // Needed to prevent a memory leak
return nil
}
// Status returns the current status of this transaction