2020-03-10 12:20:34 -07:00
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package avalanche
import (
2020-05-29 12:03:00 -07:00
"fmt"
2020-06-01 14:32:29 -07:00
"github.com/ava-labs/gecko/cache"
2020-03-10 12:20:34 -07:00
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/engine/common/queue"
"github.com/ava-labs/gecko/utils/formatting"
"github.com/prometheus/client_golang/prometheus"
)
2020-06-01 14:32:29 -07:00
const (
2020-06-02 05:42:52 -07:00
cacheSize = 3000
2020-06-01 14:32:29 -07:00
)
2020-03-10 12:20:34 -07:00
// BootstrapConfig ...
type BootstrapConfig struct {
common . Config
// VtxBlocked tracks operations that are blocked on vertices
// TxBlocked tracks operations that are blocked on transactions
VtxBlocked , TxBlocked * queue . Jobs
State State
VM DAGVM
}
type bootstrapper struct {
BootstrapConfig
metrics
common . Bootstrapper
2020-06-04 08:10:40 -07:00
// true if all of the vertices in the original accepted frontier have been processed
processedStartingAcceptedFrontier bool
2020-06-02 13:20:48 -07:00
// number of vertices processed so far
2020-06-02 05:42:52 -07:00
numProcessed uint32
2020-06-01 14:32:29 -07:00
2020-06-04 07:54:26 -07:00
// tracks which validators were asked for which containers in which requests
2020-06-02 05:42:52 -07:00
outstandingRequests common . Requests
2020-05-22 14:41:47 -07:00
2020-06-02 13:20:48 -07:00
// Contains IDs of vertices that have recently been processed
2020-06-01 14:32:29 -07:00
processedCache * cache . LRU
2020-05-01 11:45:21 -07:00
2020-06-02 13:20:48 -07:00
// true if bootstrapping is done
finished bool
// Called when bootstrapping is done
2020-05-28 20:48:08 -07:00
onFinished func ( ) error
2020-03-10 12:20:34 -07:00
}
// Initialize this engine.
2020-05-28 20:48:08 -07:00
func ( b * bootstrapper ) Initialize ( config BootstrapConfig ) error {
2020-03-10 12:20:34 -07:00
b . BootstrapConfig = config
2020-06-01 14:32:29 -07:00
b . processedCache = & cache . LRU { Size : cacheSize }
2020-03-10 12:20:34 -07:00
b . VtxBlocked . SetParser ( & vtxParser {
2020-06-02 05:42:52 -07:00
log : config . Context . Log ,
2020-05-20 08:37:01 -07:00
numAccepted : b . numBSVtx ,
numDropped : b . numBSDroppedVtx ,
2020-03-10 12:20:34 -07:00
state : b . State ,
} )
b . TxBlocked . SetParser ( & txParser {
2020-06-02 05:42:52 -07:00
log : config . Context . Log ,
2020-05-20 08:37:01 -07:00
numAccepted : b . numBSTx ,
numDropped : b . numBSDroppedTx ,
2020-03-10 12:20:34 -07:00
vm : b . VM ,
} )
config . Bootstrapable = b
b . Bootstrapper . Initialize ( config . Config )
2020-05-28 20:48:08 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
// CurrentAcceptedFrontier ...
func ( b * bootstrapper ) CurrentAcceptedFrontier ( ) ids . Set {
acceptedFrontier := ids . Set { }
acceptedFrontier . Add ( b . State . Edge ( ) ... )
return acceptedFrontier
}
// FilterAccepted ...
func ( b * bootstrapper ) FilterAccepted ( containerIDs ids . Set ) ids . Set {
acceptedVtxIDs := ids . Set { }
for _ , vtxID := range containerIDs . List ( ) {
if vtx , err := b . State . GetVertex ( vtxID ) ; err == nil && vtx . Status ( ) == choices . Accepted {
acceptedVtxIDs . Add ( vtxID )
}
}
return acceptedVtxIDs
}
2020-06-03 16:04:29 -07:00
// Get vertex [vtxID] and its ancestors
2020-06-02 13:20:48 -07:00
func ( b * bootstrapper ) fetch ( vtxID ids . ID ) error {
2020-06-03 16:04:29 -07:00
// Make sure we haven't already requested this block
if b . outstandingRequests . Contains ( vtxID ) {
return nil
}
2020-06-02 05:42:52 -07:00
// Make sure we don't already have this vertex
if _ , err := b . State . GetVertex ( vtxID ) ; err == nil {
return nil
}
2020-06-01 14:32:29 -07:00
2020-06-02 05:42:52 -07:00
validators := b . BootstrapConfig . Validators . Sample ( 1 ) // validator to send request to
if len ( validators ) == 0 {
return fmt . Errorf ( "Dropping request for %s as there are no validators" , vtxID )
2020-03-10 12:20:34 -07:00
}
2020-06-02 05:42:52 -07:00
validatorID := validators [ 0 ] . ID ( )
b . RequestID ++
2020-06-02 13:20:48 -07:00
b . outstandingRequests . Add ( validatorID , b . RequestID , vtxID )
2020-06-02 05:42:52 -07:00
b . BootstrapConfig . Sender . GetAncestors ( validatorID , b . RequestID , vtxID ) // request vertex and ancestors
return nil
2020-06-01 14:32:29 -07:00
}
2020-06-02 05:42:52 -07:00
// Process vertices
func ( b * bootstrapper ) process ( vtx avalanche . Vertex ) error {
toProcess := [ ] avalanche . Vertex { vtx }
for len ( toProcess ) > 0 {
newLen := len ( toProcess ) - 1
vtx := toProcess [ newLen ]
toProcess = toProcess [ : newLen ]
if _ , ok := b . processedCache . Get ( vtx . ID ( ) ) ; ok { // already processed this
2020-06-01 14:32:29 -07:00
continue
}
2020-06-02 13:20:48 -07:00
b . numProcessed ++ // Progress tracker
if b . numProcessed % common . StatusUpdateFrequency == 0 {
b . BootstrapConfig . Context . Log . Info ( "processed %d vertices" , b . numProcessed )
}
2020-06-01 14:32:29 -07:00
2020-06-02 05:42:52 -07:00
switch vtx . Status ( ) {
case choices . Unknown :
2020-06-02 13:20:48 -07:00
if err := b . fetch ( vtx . ID ( ) ) ; err != nil {
2020-06-02 11:07:20 -07:00
return err
}
2020-06-02 05:42:52 -07:00
case choices . Rejected :
return fmt . Errorf ( "tried to accept %s even though it was previously rejected" , vtx . ID ( ) )
case choices . Processing :
if err := b . VtxBlocked . Push ( & vertexJob {
log : b . BootstrapConfig . Context . Log ,
numAccepted : b . numBSVtx ,
numDropped : b . numBSDroppedVtx ,
vtx : vtx ,
2020-06-01 14:32:29 -07:00
} ) ; err == nil {
2020-06-02 05:42:52 -07:00
b . numBSBlockedVtx . Inc ( )
2020-06-01 14:32:29 -07:00
} else {
2020-06-02 05:42:52 -07:00
b . BootstrapConfig . Context . Log . Verbo ( "couldn't push to vtxBlocked: %s" , err )
2020-06-01 14:32:29 -07:00
}
2020-06-02 05:42:52 -07:00
if err := b . VtxBlocked . Commit ( ) ; err != nil {
return err
}
for _ , tx := range vtx . Txs ( ) {
if err := b . TxBlocked . Push ( & txJob {
log : b . BootstrapConfig . Context . Log ,
numAccepted : b . numBSTx ,
numDropped : b . numBSDroppedTx ,
tx : tx ,
} ) ; err == nil {
b . numBSBlockedTx . Inc ( )
} else {
b . BootstrapConfig . Context . Log . Verbo ( "couldn't push to txBlocked: %s" , err )
}
}
if err := b . TxBlocked . Commit ( ) ; err != nil {
return err
2020-06-01 14:32:29 -07:00
}
2020-06-02 05:42:52 -07:00
for _ , parent := range vtx . Parents ( ) {
toProcess = append ( toProcess , parent )
}
2020-06-03 16:04:29 -07:00
b . processedCache . Put ( vtx . ID ( ) , nil )
2020-06-01 14:32:29 -07:00
}
2020-03-10 12:20:34 -07:00
}
2020-06-04 08:10:40 -07:00
if numPending := b . outstandingRequests . Len ( ) ; numPending == 0 && b . processedStartingAcceptedFrontier {
2020-06-02 05:42:52 -07:00
return b . finish ( )
}
return nil
2020-03-10 12:20:34 -07:00
}
2020-06-02 13:20:48 -07:00
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
2020-06-04 06:52:44 -07:00
// with request ID [requestID]. Expects vtxs[0] to be the vertex requested in the corresponding GetAncestors.
2020-06-02 11:07:20 -07:00
func ( b * bootstrapper ) MultiPut ( vdr ids . ShortID , requestID uint32 , vtxs [ ] [ ] byte ) error {
2020-06-03 16:04:29 -07:00
if lenVtxs := len ( vtxs ) ; lenVtxs > common . MaxContainersPerMultiPut {
b . BootstrapConfig . Context . Log . Debug ( "MultiPut(%s, %d) contains more than maximum number of vertices" , vdr , requestID )
return b . GetAncestorsFailed ( vdr , requestID )
} else if lenVtxs == 0 {
b . BootstrapConfig . Context . Log . Debug ( "MultiPut(%s, %d) contains no vertices" , vdr , requestID )
return b . GetAncestorsFailed ( vdr , requestID )
}
2020-06-02 11:07:20 -07:00
// Make sure this is in response to a request we made
neededVtxID , needed := b . outstandingRequests . Remove ( vdr , requestID )
2020-06-03 16:04:29 -07:00
if ! needed { // this message isn't in response to a request we made
b . BootstrapConfig . Context . Log . Debug ( "received unexpected MultiPut from %s with ID %d" , vdr , requestID )
return nil
2020-06-02 11:07:20 -07:00
}
2020-06-03 16:04:29 -07:00
neededVtx , err := b . State . ParseVertex ( vtxs [ 0 ] ) // the vertex we requested
if err != nil {
b . BootstrapConfig . Context . Log . Debug ( "Failed to parse requested vertex %s: %w" , neededVtxID , err )
b . BootstrapConfig . Context . Log . Verbo ( "vertex: %s" , formatting . DumpBytes { Bytes : vtxs [ 0 ] } )
return b . fetch ( neededVtxID )
} else if actualID := neededVtx . ID ( ) ; ! actualID . Equals ( neededVtxID ) {
b . BootstrapConfig . Context . Log . Debug ( "expected the first block to be the requested block, %s, but is %s" , neededVtxID , actualID )
return b . fetch ( neededVtxID )
}
2020-06-04 06:52:44 -07:00
for _ , vtxBytes := range vtxs { // Parse/persist all the vertices
2020-06-03 16:04:29 -07:00
if _ , err := b . State . ParseVertex ( vtxBytes ) ; err != nil { // Persists the vtx
2020-06-02 11:07:20 -07:00
b . BootstrapConfig . Context . Log . Debug ( "Failed to parse vertex: %w" , err )
b . BootstrapConfig . Context . Log . Verbo ( "vertex: %s" , formatting . DumpBytes { Bytes : vtxBytes } )
}
}
return b . process ( neededVtx )
}
2020-06-03 16:04:29 -07:00
// GetAncestorsFailed is called when a GetAncestors message we sent fails
func ( b * bootstrapper ) GetAncestorsFailed ( vdr ids . ShortID , requestID uint32 ) error {
2020-06-01 14:32:29 -07:00
vtxID , ok := b . outstandingRequests . Remove ( vdr , requestID )
if ! ok {
2020-06-03 16:04:29 -07:00
b . BootstrapConfig . Context . Log . Debug ( "GetAncestorsFailed(%s, %d) called but there was no outstanding request to this validator with this ID" , vdr , requestID )
2020-06-01 14:32:29 -07:00
return nil
2020-03-24 16:08:53 -07:00
}
2020-06-03 16:04:29 -07:00
// Send another request for the vertex
2020-06-02 13:20:48 -07:00
return b . fetch ( vtxID )
2020-03-24 16:08:53 -07:00
}
2020-06-01 14:32:29 -07:00
// ForceAccepted ...
func ( b * bootstrapper ) ForceAccepted ( acceptedContainerIDs ids . Set ) error {
2020-06-06 22:08:53 -07:00
if err := b . VM . Bootstrapping ( ) ; err != nil {
return fmt . Errorf ( "failed to notify VM that bootstrapping has started: %w" ,
err )
}
2020-06-01 14:32:29 -07:00
for _ , vtxID := range acceptedContainerIDs . List ( ) {
2020-06-02 05:42:52 -07:00
if vtx , err := b . State . GetVertex ( vtxID ) ; err == nil {
2020-06-03 16:04:29 -07:00
if err := b . process ( vtx ) ; err != nil {
return err
}
2020-06-02 13:20:48 -07:00
} else if err := b . fetch ( vtxID ) ; err != nil {
2020-06-02 05:42:52 -07:00
return err
2020-03-10 12:20:34 -07:00
}
}
2020-06-04 08:10:40 -07:00
b . processedStartingAcceptedFrontier = true
2020-03-10 12:20:34 -07:00
2020-06-02 05:42:52 -07:00
if numPending := b . outstandingRequests . Len ( ) ; numPending == 0 {
return b . finish ( )
}
2020-05-29 12:03:00 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
2020-06-01 14:32:29 -07:00
// Finish bootstrapping
2020-06-02 05:42:52 -07:00
func ( b * bootstrapper ) finish ( ) error {
2020-03-10 12:20:34 -07:00
if b . finished {
2020-06-02 05:42:52 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
2020-06-02 13:20:48 -07:00
b . BootstrapConfig . Context . Log . Info ( "finished fetching vertices. executing state transitions..." )
2020-03-10 12:20:34 -07:00
2020-06-02 05:42:52 -07:00
if err := b . executeAll ( b . TxBlocked , b . numBSBlockedTx ) ; err != nil {
return err
}
if err := b . executeAll ( b . VtxBlocked , b . numBSBlockedVtx ) ; err != nil {
return err
}
2020-03-10 12:20:34 -07:00
2020-06-01 20:24:21 -07:00
if err := b . VM . Bootstrapped ( ) ; err != nil {
return fmt . Errorf ( "failed to notify VM that bootstrapping has finished: %w" ,
err )
}
2020-03-10 12:20:34 -07:00
// Start consensus
2020-06-02 05:42:52 -07:00
if err := b . onFinished ( ) ; err != nil {
return err
}
2020-03-10 12:20:34 -07:00
b . finished = true
2020-06-02 05:42:52 -07:00
return nil
2020-03-10 12:20:34 -07:00
}
2020-06-02 05:42:52 -07:00
func ( b * bootstrapper ) executeAll ( jobs * queue . Jobs , numBlocked prometheus . Gauge ) error {
2020-03-10 12:20:34 -07:00
for job , err := jobs . Pop ( ) ; err == nil ; job , err = jobs . Pop ( ) {
numBlocked . Dec ( )
2020-04-20 17:00:36 -07:00
b . BootstrapConfig . Context . Log . Debug ( "Executing: %s" , job . ID ( ) )
2020-03-10 12:20:34 -07:00
if err := jobs . Execute ( job ) ; err != nil {
2020-06-02 05:42:52 -07:00
b . BootstrapConfig . Context . Log . Error ( "Error executing: %s" , err )
return err
}
if err := jobs . Commit ( ) ; err != nil {
return err
2020-05-29 12:03:00 -07:00
}
2020-03-10 12:20:34 -07:00
}
2020-06-02 05:42:52 -07:00
return nil
2020-03-10 12:20:34 -07:00
}