Merge pull request #49 from ava-labs/bootstrap-improvements

Bootstrap improvements
This commit is contained in:
Stephen Buttolph 2020-06-07 01:30:59 -04:00 committed by GitHub
commit d3b2089ee2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
29 changed files with 1490 additions and 1223 deletions

2
go.mod
View File

@ -7,7 +7,7 @@ require (
github.com/allegro/bigcache v1.2.1 // indirect
github.com/aristanetworks/goarista v0.0.0-20200520141224-0f14e646773f // indirect
github.com/ava-labs/coreth v0.2.0 // Added manually; don't delete
github.com/ava-labs/go-ethereum v1.9.3
github.com/ava-labs/go-ethereum v1.9.3 // indirect
github.com/deckarep/golang-set v1.7.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1 v1.0.3
github.com/decred/dcrd/dcrec/secp256k1/v3 v3.0.0-20200526030155-0c6c7ca85d3b

View File

@ -89,6 +89,15 @@ func (m Builder) Get(chainID ids.ID, requestID uint32, containerID ids.ID) (Msg,
})
}
// GetAncestors message
func (m Builder) GetAncestors(chainID ids.ID, requestID uint32, containerID ids.ID) (Msg, error) {
return m.Pack(GetAncestors, map[Field]interface{}{
ChainID: chainID.Bytes(),
RequestID: requestID,
ContainerID: containerID.Bytes(),
})
}
// Put message
func (m Builder) Put(chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) (Msg, error) {
return m.Pack(Put, map[Field]interface{}{
@ -99,6 +108,15 @@ func (m Builder) Put(chainID ids.ID, requestID uint32, containerID ids.ID, conta
})
}
// MultiPut message
func (m Builder) MultiPut(chainID ids.ID, requestID uint32, containers [][]byte) (Msg, error) {
return m.Pack(MultiPut, map[Field]interface{}{
ChainID: chainID.Bytes(),
RequestID: requestID,
MultiContainerBytes: containers,
})
}
// PushQuery message
func (m Builder) PushQuery(chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) (Msg, error) {
return m.Pack(PushQuery, map[Field]interface{}{

View File

@ -12,17 +12,18 @@ type Field uint32
// Fields that may be packed. These values are not sent over the wire.
const (
VersionStr Field = iota // Used in handshake
NetworkID // Used in handshake
NodeID // Used in handshake
MyTime // Used in handshake
IP // Used in handshake
Peers // Used in handshake
ChainID // Used for dispatching
RequestID // Used for all messages
ContainerID // Used for querying
ContainerBytes // Used for gossiping
ContainerIDs // Used for querying
VersionStr Field = iota // Used in handshake
NetworkID // Used in handshake
NodeID // Used in handshake
MyTime // Used in handshake
IP // Used in handshake
Peers // Used in handshake
ChainID // Used for dispatching
RequestID // Used for all messages
ContainerID // Used for querying
ContainerBytes // Used for gossiping
ContainerIDs // Used for querying
MultiContainerBytes // Used in MultiPut
)
// Packer returns the packer function that can be used to pack this field.
@ -50,6 +51,8 @@ func (f Field) Packer() func(*wrappers.Packer, interface{}) {
return wrappers.TryPackBytes
case ContainerIDs:
return wrappers.TryPackHashes
case MultiContainerBytes:
return wrappers.TryPack2DBytes
default:
return nil
}
@ -80,6 +83,8 @@ func (f Field) Unpacker() func(*wrappers.Packer) interface{} {
return wrappers.TryUnpackBytes
case ContainerIDs:
return wrappers.TryUnpackHashes
case MultiContainerBytes:
return wrappers.TryUnpack2DBytes
default:
return nil
}
@ -107,6 +112,8 @@ func (f Field) String() string {
return "Container Bytes"
case ContainerIDs:
return "Container IDs"
case MultiContainerBytes:
return "MultiContainerBytes"
default:
return "Unknown Field"
}
@ -135,8 +142,12 @@ func (op Op) String() string {
return "accepted"
case Get:
return "get"
case GetAncestors:
return "get_ancestors"
case Put:
return "put"
case MultiPut:
return "multi_put"
case PushQuery:
return "push_query"
case PullQuery:
@ -166,6 +177,11 @@ const (
PushQuery
PullQuery
Chits
// Bootstrapping:
// TODO: Move GetAncestors and MultiPut with the rest of the bootstrapping
// commands when we do non-backwards compatible upgrade
GetAncestors
MultiPut
)
// Defines the messages that can be sent/received with this network
@ -181,6 +197,8 @@ var (
AcceptedFrontier: []Field{ChainID, RequestID, ContainerIDs},
GetAccepted: []Field{ChainID, RequestID, ContainerIDs},
Accepted: []Field{ChainID, RequestID, ContainerIDs},
GetAncestors: []Field{ChainID, RequestID, ContainerID},
MultiPut: []Field{ChainID, RequestID, MultiContainerBytes},
// Consensus:
Get: []Field{ChainID, RequestID, ContainerID},
Put: []Field{ChainID, RequestID, ContainerID, ContainerBytes},

View File

@ -56,7 +56,7 @@ type metrics struct {
getPeerlist, peerlist,
getAcceptedFrontier, acceptedFrontier,
getAccepted, accepted,
get, put,
get, getAncestors, put, multiPut,
pushQuery, pullQuery, chits messageMetrics
}
@ -83,7 +83,9 @@ func (m *metrics) initialize(registerer prometheus.Registerer) error {
errs.Add(m.getAccepted.initialize(GetAccepted, registerer))
errs.Add(m.accepted.initialize(Accepted, registerer))
errs.Add(m.get.initialize(Get, registerer))
errs.Add(m.getAncestors.initialize(GetAncestors, registerer))
errs.Add(m.put.initialize(Put, registerer))
errs.Add(m.multiPut.initialize(MultiPut, registerer))
errs.Add(m.pushQuery.initialize(PushQuery, registerer))
errs.Add(m.pullQuery.initialize(PullQuery, registerer))
errs.Add(m.chits.initialize(Chits, registerer))
@ -111,8 +113,12 @@ func (m *metrics) message(msgType Op) *messageMetrics {
return &m.accepted
case Get:
return &m.get
case GetAncestors:
return &m.getAncestors
case Put:
return &m.put
case MultiPut:
return &m.multiPut
case PushQuery:
return &m.pushQuery
case PullQuery:

View File

@ -30,7 +30,7 @@ import (
const (
defaultInitialReconnectDelay = time.Second
defaultMaxReconnectDelay = time.Hour
defaultMaxMessageSize uint32 = 1 << 21
DefaultMaxMessageSize uint32 = 1 << 21
defaultSendQueueSize = 1 << 10
defaultMaxClockDifference = time.Minute
defaultPeerListGossipSpacing = time.Minute
@ -162,7 +162,7 @@ func NewDefaultNetwork(
router,
defaultInitialReconnectDelay,
defaultMaxReconnectDelay,
defaultMaxMessageSize,
DefaultMaxMessageSize,
defaultSendQueueSize,
defaultMaxClockDifference,
defaultPeerListGossipSpacing,
@ -359,6 +359,29 @@ func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32,
}
}
// GetAncestors implements the Sender interface.
func (n *network) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.GetAncestors(chainID, requestID, containerID)
if err != nil {
n.log.Error("failed to build GetAncestors message: %w", err)
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.getAncestors.numFailed.Inc()
n.log.Debug("failed to send a GetAncestors message to: %s", validatorID)
} else {
n.getAncestors.numSent.Inc()
}
}
// Put implements the Sender interface.
func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.Put(chainID, requestID, containerID, container)
@ -382,6 +405,29 @@ func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32,
}
}
// MultiPut implements the Sender interface.
func (n *network) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) {
msg, err := n.b.MultiPut(chainID, requestID, containers)
if err != nil {
n.log.Error("failed to build MultiPut message because of container of size %d", len(containers))
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a MultiPut message to: %s", validatorID)
n.multiPut.numFailed.Inc()
} else {
n.multiPut.numSent.Inc()
}
}
// PushQuery implements the Sender interface.
func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.PushQuery(chainID, requestID, containerID, container)

View File

@ -201,7 +201,7 @@ func (p *peer) handle(msg Msg) {
op := msg.Op()
msgMetrics := p.net.message(op)
if msgMetrics == nil {
p.net.log.Debug("dropping an unknown message from %s with op %d", p.id, op)
p.net.log.Debug("dropping an unknown message from %s with op %s", p.id, op.String())
return
}
msgMetrics.numReceived.Inc()
@ -236,14 +236,20 @@ func (p *peer) handle(msg Msg) {
p.accepted(msg)
case Get:
p.get(msg)
case GetAncestors:
p.getAncestors(msg)
case Put:
p.put(msg)
case MultiPut:
p.multiPut(msg)
case PushQuery:
p.pushQuery(msg)
case PullQuery:
p.pullQuery(msg)
case Chits:
p.chits(msg)
default:
p.net.log.Debug("dropping an unknown message from %s with op %s", p.id, op.String())
}
}
@ -537,6 +543,16 @@ func (p *peer) get(msg Msg) {
p.net.router.Get(p.id, chainID, requestID, containerID)
}
func (p *peer) getAncestors(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containerID, err := ids.ToID(msg.Get(ContainerID).([]byte))
p.net.log.AssertNoError(err)
p.net.router.GetAncestors(p.id, chainID, requestID, containerID)
}
// assumes the stateLock is not held
func (p *peer) put(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
@ -549,6 +565,16 @@ func (p *peer) put(msg Msg) {
p.net.router.Put(p.id, chainID, requestID, containerID, container)
}
// assumes the stateLock is not held
func (p *peer) multiPut(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
requestID := msg.Get(RequestID).(uint32)
containers := msg.Get(MultiContainerBytes).([][]byte)
p.net.router.MultiPut(p.id, chainID, requestID, containers)
}
// assumes the stateLock is not held
func (p *peer) pushQuery(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))

View File

@ -6,6 +6,7 @@ package avalanche
import (
"fmt"
"github.com/ava-labs/gecko/cache"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
@ -15,6 +16,10 @@ import (
"github.com/prometheus/client_golang/prometheus"
)
const (
cacheSize = 3000
)
// BootstrapConfig ...
type BootstrapConfig struct {
common.Config
@ -32,24 +37,29 @@ type bootstrapper struct {
metrics
common.Bootstrapper
// IDs of vertices that we're already in the process of getting
// TODO: Find a better way to track; this keeps every single vertex's ID in memory when bootstrapping from nothing
seen ids.Set
// true if all of the vertices in the original accepted frontier have been processed
processedStartingAcceptedFrontier bool
numFetched uint64 // number of vertices that have been fetched from validators
// number of vertices processed so far
numProcessed uint32
// vtxReqs prevents asking validators for the same vertex
vtxReqs common.Requests
// tracks which validators were asked for which containers in which requests
outstandingRequests common.Requests
// IDs of vertices that we have requested from other validators but haven't received
pending ids.Set
finished bool
// Contains IDs of vertices that have recently been processed
processedCache *cache.LRU
// true if bootstrapping is done
finished bool
// Called when bootstrapping is done
onFinished func() error
}
// Initialize this engine.
func (b *bootstrapper) Initialize(config BootstrapConfig) error {
b.BootstrapConfig = config
b.processedCache = &cache.LRU{Size: cacheSize}
b.VtxBlocked.SetParser(&vtxParser{
log: config.Context.Log,
@ -88,123 +98,53 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
return acceptedVtxIDs
}
// ForceAccepted ...
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
if err := b.VM.Bootstrapping(); err != nil {
return fmt.Errorf("failed to notify VM that bootstrapping has started: %w",
err)
}
for _, vtxID := range acceptedContainerIDs.List() {
if err := b.fetch(vtxID); err != nil {
return err
}
}
if numPending := b.pending.Len(); numPending == 0 {
// TODO: This typically indicates bootstrapping has failed, so this
// should be handled appropriately
return b.finish()
}
return nil
}
// Put ...
func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
vtx, err := b.State.ParseVertex(vtxBytes)
if err != nil {
b.BootstrapConfig.Context.Log.Debug("ParseVertex failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: vtxBytes})
return b.GetFailed(vdr, requestID)
}
if !b.pending.Contains(vtx.ID()) {
b.BootstrapConfig.Context.Log.Debug("Validator %s sent an unrequested vertex:\n%s",
vdr,
formatting.DumpBytes{Bytes: vtxBytes})
return b.GetFailed(vdr, requestID)
}
return b.addVertex(vtx)
}
// GetFailed ...
func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
vtxID, ok := b.vtxReqs.Remove(vdr, requestID)
if !ok {
b.BootstrapConfig.Context.Log.Debug("GetFailed called without sending the corresponding Get message from %s",
vdr)
return nil
}
b.sendRequest(vtxID)
return nil
}
// Get vertex [vtxID] and its ancestors
func (b *bootstrapper) fetch(vtxID ids.ID) error {
if b.pending.Contains(vtxID) {
// Make sure we haven't already requested this block
if b.outstandingRequests.Contains(vtxID) {
return nil
}
vtx, err := b.State.GetVertex(vtxID)
if err != nil {
b.sendRequest(vtxID)
// Make sure we don't already have this vertex
if _, err := b.State.GetVertex(vtxID); err == nil {
return nil
}
return b.storeVertex(vtx)
}
func (b *bootstrapper) sendRequest(vtxID ids.ID) {
validators := b.BootstrapConfig.Validators.Sample(1)
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
if len(validators) == 0 {
b.BootstrapConfig.Context.Log.Error("Dropping request for %s as there are no validators", vtxID)
return
return fmt.Errorf("Dropping request for %s as there are no validators", vtxID)
}
validatorID := validators[0].ID()
b.RequestID++
b.vtxReqs.RemoveAny(vtxID)
b.vtxReqs.Add(validatorID, b.RequestID, vtxID)
b.pending.Add(vtxID)
b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, vtxID)
b.numBSPendingRequests.Set(float64(b.pending.Len()))
}
func (b *bootstrapper) addVertex(vtx avalanche.Vertex) error {
if err := b.storeVertex(vtx); err != nil {
return err
}
if numPending := b.pending.Len(); numPending == 0 {
return b.finish()
}
b.outstandingRequests.Add(validatorID, b.RequestID, vtxID)
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors
return nil
}
func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) error {
vts := []avalanche.Vertex{vtx}
b.numFetched++
if b.numFetched%2500 == 0 { // perioidcally inform user of progress
b.BootstrapConfig.Context.Log.Info("bootstrapping has fetched %d vertices", b.numFetched)
}
// Process vertices
func (b *bootstrapper) process(vtx avalanche.Vertex) error {
toProcess := []avalanche.Vertex{vtx}
for len(toProcess) > 0 {
newLen := len(toProcess) - 1
vtx := toProcess[newLen]
toProcess = toProcess[:newLen]
if _, ok := b.processedCache.Get(vtx.ID()); ok { // already processed this
continue
}
b.numProcessed++ // Progress tracker
if b.numProcessed%common.StatusUpdateFrequency == 0 {
b.BootstrapConfig.Context.Log.Info("processed %d vertices", b.numProcessed)
}
for len(vts) > 0 {
newLen := len(vts) - 1
vtx := vts[newLen]
vts = vts[:newLen]
vtxID := vtx.ID()
switch status := vtx.Status(); status {
switch vtx.Status() {
case choices.Unknown:
b.sendRequest(vtxID)
if err := b.fetch(vtx.ID()); err != nil {
return err
}
case choices.Rejected:
return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID())
case choices.Processing:
b.pending.Remove(vtxID)
if err := b.VtxBlocked.Push(&vertexJob{
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSVtx,
@ -213,7 +153,7 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) error {
}); err == nil {
b.numBSBlockedVtx.Inc()
} else {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked")
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked: %s", err)
}
if err := b.VtxBlocked.Commit(); err != nil {
return err
@ -227,35 +167,103 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) error {
}); err == nil {
b.numBSBlockedTx.Inc()
} else {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to txBlocked")
b.BootstrapConfig.Context.Log.Verbo("couldn't push to txBlocked: %s", err)
}
}
if err := b.TxBlocked.Commit(); err != nil {
return err
}
for _, parent := range vtx.Parents() {
if parentID := parent.ID(); !b.seen.Contains(parentID) {
b.seen.Add(parentID)
vts = append(vts, parent)
}
toProcess = append(toProcess, parent)
}
case choices.Accepted:
b.BootstrapConfig.Context.Log.Verbo("bootstrapping confirmed %s", vtxID)
case choices.Rejected:
return fmt.Errorf("bootstrapping wants to accept %s, however it was previously rejected", vtxID)
b.processedCache.Put(vtx.ID(), nil)
}
}
numPending := b.pending.Len()
b.numBSPendingRequests.Set(float64(numPending))
if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier {
return b.finish()
}
return nil
}
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
// with request ID [requestID]. Expects vtxs[0] to be the vertex requested in the corresponding GetAncestors.
func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte) error {
if lenVtxs := len(vtxs); lenVtxs > common.MaxContainersPerMultiPut {
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains more than maximum number of vertices", vdr, requestID)
return b.GetAncestorsFailed(vdr, requestID)
} else if lenVtxs == 0 {
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains no vertices", vdr, requestID)
return b.GetAncestorsFailed(vdr, requestID)
}
// Make sure this is in response to a request we made
neededVtxID, needed := b.outstandingRequests.Remove(vdr, requestID)
if !needed { // this message isn't in response to a request we made
b.BootstrapConfig.Context.Log.Debug("received unexpected MultiPut from %s with ID %d", vdr, requestID)
return nil
}
neededVtx, err := b.State.ParseVertex(vtxs[0]) // the vertex we requested
if err != nil {
b.BootstrapConfig.Context.Log.Debug("Failed to parse requested vertex %s: %w", neededVtxID, err)
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxs[0]})
return b.fetch(neededVtxID)
} else if actualID := neededVtx.ID(); !actualID.Equals(neededVtxID) {
b.BootstrapConfig.Context.Log.Debug("expected the first block to be the requested block, %s, but is %s", neededVtxID, actualID)
return b.fetch(neededVtxID)
}
for _, vtxBytes := range vtxs { // Parse/persist all the vertices
if _, err := b.State.ParseVertex(vtxBytes); err != nil { // Persists the vtx
b.BootstrapConfig.Context.Log.Debug("Failed to parse vertex: %w", err)
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
}
}
return b.process(neededVtx)
}
// GetAncestorsFailed is called when a GetAncestors message we sent fails
func (b *bootstrapper) GetAncestorsFailed(vdr ids.ShortID, requestID uint32) error {
vtxID, ok := b.outstandingRequests.Remove(vdr, requestID)
if !ok {
b.BootstrapConfig.Context.Log.Debug("GetAncestorsFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
return nil
}
// Send another request for the vertex
return b.fetch(vtxID)
}
// ForceAccepted ...
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
if err := b.VM.Bootstrapping(); err != nil {
return fmt.Errorf("failed to notify VM that bootstrapping has started: %w",
err)
}
for _, vtxID := range acceptedContainerIDs.List() {
if vtx, err := b.State.GetVertex(vtxID); err == nil {
if err := b.process(vtx); err != nil {
return err
}
} else if err := b.fetch(vtxID); err != nil {
return err
}
}
b.processedStartingAcceptedFrontier = true
if numPending := b.outstandingRequests.Len(); numPending == 0 {
return b.finish()
}
return nil
}
// Finish bootstrapping
func (b *bootstrapper) finish() error {
if b.finished {
return nil
}
b.BootstrapConfig.Context.Log.Info("bootstrapping finished fetching vertices. executing state transitions...")
b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing state transitions...")
if err := b.executeAll(b.TxBlocked, b.numBSBlockedTx); err != nil {
return err
@ -273,7 +281,6 @@ func (b *bootstrapper) finish() error {
if err := b.onFinished(); err != nil {
return err
}
b.seen = ids.Set{}
b.finished = true
return nil
}

File diff suppressed because it is too large Load Diff

View File

@ -4,8 +4,12 @@
package avalanche
import (
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/network"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
"github.com/ava-labs/gecko/snow/engine/common"
@ -15,6 +19,12 @@ import (
"github.com/ava-labs/gecko/utils/wrappers"
)
const (
// TODO define this constant in one place rather than here and in snowman
// Max containers size in a MultiPut message
maxContainersLen = int(4 * network.DefaultMaxMessageSize / 5)
)
// Transitive implements the Engine interface by attempting to fetch all
// transitive dependencies.
type Transitive struct {
@ -40,7 +50,7 @@ type Transitive struct {
// Initialize implements the Engine interface
func (t *Transitive) Initialize(config Config) error {
config.Context.Log.Info("Initializing Avalanche consensus")
config.Context.Log.Info("Initializing consensus engine")
t.Config = config
t.metrics.Initialize(config.Context.Log, config.Params.Namespace, config.Params.Metrics)
@ -61,13 +71,13 @@ func (t *Transitive) finishBootstrapping() error {
if vtx, err := t.Config.State.GetVertex(vtxID); err == nil {
frontier = append(frontier, vtx)
} else {
t.Config.Context.Log.Error("Vertex %s failed to be loaded from the frontier with %s", vtxID, err)
t.Config.Context.Log.Error("vertex %s failed to be loaded from the frontier with %s", vtxID, err)
}
}
t.Consensus.Initialize(t.Config.Context, t.Params, frontier)
t.bootstrapped = true
t.Config.Context.Log.Info("Bootstrapping finished with %d vertices in the accepted frontier", len(frontier))
t.Config.Context.Log.Info("bootstrapping finished with %d vertices in the accepted frontier", len(frontier))
return nil
}
@ -75,7 +85,7 @@ func (t *Transitive) finishBootstrapping() error {
func (t *Transitive) Gossip() error {
edge := t.Config.State.Edge()
if len(edge) == 0 {
t.Config.Context.Log.Debug("Dropping gossip request as no vertices have been accepted")
t.Config.Context.Log.Verbo("dropping gossip request as no vertices have been accepted")
return nil
}
@ -83,18 +93,18 @@ func (t *Transitive) Gossip() error {
vtxID := edge[sampler.Sample()]
vtx, err := t.Config.State.GetVertex(vtxID)
if err != nil {
t.Config.Context.Log.Warn("Dropping gossip request as %s couldn't be loaded due to %s", vtxID, err)
t.Config.Context.Log.Warn("dropping gossip request as %s couldn't be loaded due to: %s", vtxID, err)
return nil
}
t.Config.Context.Log.Debug("Gossiping %s as accepted to the network", vtxID)
t.Config.Context.Log.Verbo("gossiping %s as accepted to the network", vtxID)
t.Config.Sender.Gossip(vtxID, vtx.Bytes())
return nil
}
// Shutdown implements the Engine interface
func (t *Transitive) Shutdown() error {
t.Config.Context.Log.Info("Shutting down Avalanche consensus")
t.Config.Context.Log.Info("shutting down consensus engine")
return t.Config.VM.Shutdown()
}
@ -110,19 +120,63 @@ func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error
return nil
}
// GetAncestors implements the Engine interface
func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error {
startTime := time.Now()
t.Config.Context.Log.Verbo("GetAncestors(%s, %d, %s) called", vdr, requestID, vtxID)
vertex, err := t.Config.State.GetVertex(vtxID)
if err != nil || vertex.Status() == choices.Unknown {
t.Config.Context.Log.Verbo("dropping getAncestors")
return nil // Don't have the requested vertex. Drop message.
}
queue := make([]avalanche.Vertex, 1, common.MaxContainersPerMultiPut) // for BFS
queue[0] = vertex
ancestorsBytesLen := 0 // length, in bytes, of vertex and its ancestors
ancestorsBytes := make([][]byte, 0, common.MaxContainersPerMultiPut) // vertex and its ancestors in BFS order
visited := ids.Set{} // IDs of vertices that have been in queue before
visited.Add(vertex.ID())
for len(ancestorsBytes) < common.MaxContainersPerMultiPut && len(queue) > 0 && time.Since(startTime) < common.MaxTimeFetchingAncestors {
var vtx avalanche.Vertex
vtx, queue = queue[0], queue[1:] // pop
vtxBytes := vtx.Bytes()
// Ensure response size isn't too large. Include wrappers.IntLen because the size of the message
// is included with each container, and the size is repr. by an int.
if newLen := wrappers.IntLen + ancestorsBytesLen + len(vtxBytes); newLen < maxContainersLen {
ancestorsBytes = append(ancestorsBytes, vtxBytes)
ancestorsBytesLen = newLen
} else { // reached maximum response size
break
}
for _, parent := range vtx.Parents() {
if parent.Status() == choices.Unknown { // Don't have this vertex;ignore
continue
}
if parentID := parent.ID(); !visited.Contains(parentID) { // If already visited, ignore
queue = append(queue, parent)
visited.Add(parentID)
}
}
}
t.Config.Sender.MultiPut(vdr, requestID, ancestorsBytes)
return nil
}
// Put implements the Engine interface
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
t.Config.Context.Log.Verbo("Put called for vertexID %s", vtxID)
t.Config.Context.Log.Verbo("Put(%s, %d, %s) called", vdr, requestID, vtxID)
if !t.bootstrapped {
return t.bootstrapper.Put(vdr, requestID, vtxID, vtxBytes)
if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
return nil
}
vtx, err := t.Config.State.ParseVertex(vtxBytes)
if err != nil {
t.Config.Context.Log.Debug("ParseVertex failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: vtxBytes})
t.Config.Context.Log.Debug("failed to parse vertex %s due to: %s", vtxID, err)
t.Config.Context.Log.Verbo("vertex:\n%s", formatting.DumpBytes{Bytes: vtxBytes})
return t.GetFailed(vdr, requestID)
}
_, err = t.insertFrom(vdr, vtx)
@ -131,14 +185,14 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxByt
// GetFailed implements the Engine interface
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
if !t.bootstrapped {
return t.bootstrapper.GetFailed(vdr, requestID)
if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid
t.Config.Context.Log.Debug("dropping GetFailed(%s, %d) due to bootstrapping", vdr, requestID)
return nil
}
vtxID, ok := t.vtxReqs.Remove(vdr, requestID)
if !ok {
t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s",
vdr)
t.Config.Context.Log.Debug("GetFailed(%s, %d) called without having sent corresponding Get", vdr, requestID)
return nil
}
@ -160,7 +214,7 @@ func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
// PullQuery implements the Engine interface
func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error {
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping", vtxID)
t.Config.Context.Log.Debug("dropping PullQuery(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
return nil
}
@ -188,15 +242,14 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID)
// PushQuery implements the Engine interface
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", vtxID)
t.Config.Context.Log.Debug("dropping PushQuery(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID)
return nil
}
vtx, err := t.Config.State.ParseVertex(vtxBytes)
if err != nil {
t.Config.Context.Log.Warn("ParseVertex failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: vtxBytes})
t.Config.Context.Log.Debug("failed to parse vertex %s due to: %s", vtxID, err)
t.Config.Context.Log.Verbo("vertex:\n%s", formatting.DumpBytes{Bytes: vtxBytes})
return nil
}
@ -210,7 +263,7 @@ func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID,
// Chits implements the Engine interface
func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) error {
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping Chits due to bootstrapping")
t.Config.Context.Log.Debug("dropping Chits(%s, %d) due to bootstrapping", vdr, requestID)
return nil
}
@ -241,7 +294,7 @@ func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) error {
// Notify implements the Engine interface
func (t *Transitive) Notify(msg common.Message) error {
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping Notify due to bootstrapping")
t.Config.Context.Log.Debug("dropping Notify due to bootstrapping")
return nil
}
@ -345,7 +398,7 @@ func (t *Transitive) insert(vtx avalanche.Vertex) error {
}
}
t.Config.Context.Log.Verbo("Vertex: %s is blocking on %d vertices and %d transactions", vtxID, i.vtxDeps.Len(), i.txDeps.Len())
t.Config.Context.Log.Verbo("vertex %s is blocking on %d vertices and %d transactions", vtxID, i.vtxDeps.Len(), i.txDeps.Len())
t.vtxBlocked.Register(&vtxIssuer{i: i})
t.txBlocked.Register(&txIssuer{i: i})
@ -403,7 +456,7 @@ func (t *Transitive) issueRepoll() {
preferredIDs := t.Consensus.Preferences().List()
numPreferredIDs := len(preferredIDs)
if numPreferredIDs == 0 {
t.Config.Context.Log.Error("Re-query attempt was dropped due to no pending vertices")
t.Config.Context.Log.Error("re-query attempt was dropped due to no pending vertices")
return
}
@ -422,12 +475,12 @@ func (t *Transitive) issueRepoll() {
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Sender.PullQuery(vdrSet, t.RequestID, vtxID)
} else if numVdrs < p.K {
t.Config.Context.Log.Error("Re-query for %s was dropped due to an insufficient number of validators", vtxID)
t.Config.Context.Log.Error("re-query for %s was dropped due to an insufficient number of validators", vtxID)
}
}
func (t *Transitive) issueBatch(txs []snowstorm.Tx) error {
t.Config.Context.Log.Verbo("Batching %d transactions into a new vertex", len(txs))
t.Config.Context.Log.Verbo("batching %d transactions into a new vertex", len(txs))
virtuousIDs := t.Consensus.Virtuous().List()
sampler := random.Uniform{N: len(virtuousIDs)}
@ -438,7 +491,7 @@ func (t *Transitive) issueBatch(txs []snowstorm.Tx) error {
vtx, err := t.Config.State.BuildVertex(parentIDs, txs)
if err != nil {
t.Config.Context.Log.Warn("Error building new vertex with %d parents and %d transactions", len(parentIDs), len(txs))
t.Config.Context.Log.Warn("error building new vertex with %d parents and %d transactions", len(parentIDs), len(txs))
return nil
}
return t.insert(vtx)
@ -446,7 +499,7 @@ func (t *Transitive) issueBatch(txs []snowstorm.Tx) error {
func (t *Transitive) sendRequest(vdr ids.ShortID, vtxID ids.ID) {
if t.vtxReqs.Contains(vtxID) {
t.Config.Context.Log.Debug("Not requesting a vertex because we have recently sent a request")
t.Config.Context.Log.Debug("not requesting a vertex because we have recently sent a request")
return
}

View File

@ -2275,7 +2275,7 @@ func TestEngineBootstrappingIntoConsensus(t *testing.T) {
panic("Unknown vertex requested")
}
sender.GetF = func(inVdr ids.ShortID, reqID uint32, vtxID ids.ID) {
sender.GetAncestorsF = func(inVdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdrID.Equals(inVdr) {
t.Fatalf("Asking wrong validator for vertex")
}
@ -2318,7 +2318,7 @@ func TestEngineBootstrappingIntoConsensus(t *testing.T) {
panic("Unknown bytes provided")
}
te.Put(vdrID, *requestID, vtxID0, vtxBytes0)
te.MultiPut(vdrID, *requestID, [][]byte{vtxBytes0})
vm.ParseTxF = nil
st.parseVertex = nil

View File

@ -5,15 +5,31 @@ package common
import (
stdmath "math"
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/math"
)
const (
// MaxContainersPerMultiPut is the maximum number of containers that can be sent in a MultiPut
MaxContainersPerMultiPut = 2000
// StatusUpdateFrequency ... bootstrapper logs "processed X blocks/vertices" every [statusUpdateFrequency] blocks/vertices
StatusUpdateFrequency = 2500
)
var (
// MaxTimeFetchingAncestors is the maximum amount of time to spend fetching vertices during a call to GetAncestors
MaxTimeFetchingAncestors = 100 * time.Millisecond
)
// Bootstrapper implements the Engine interface.
type Bootstrapper struct {
Config
// IDs of validators we have requested the accepted frontier from but haven't
// received a reply from
pendingAcceptedFrontier ids.ShortSet
acceptedFrontier ids.Set
@ -43,6 +59,7 @@ func (b *Bootstrapper) Startup() error {
return b.Bootstrapable.ForceAccepted(ids.Set{})
}
// Ask each of the bootstrap validators to send their accepted frontier
vdrs := ids.ShortSet{}
vdrs.Union(b.pendingAcceptedFrontier)
@ -59,6 +76,7 @@ func (b *Bootstrapper) GetAcceptedFrontier(validatorID ids.ShortID, requestID ui
// GetAcceptedFrontierFailed implements the Engine interface.
func (b *Bootstrapper) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) error {
// If we can't get a response from [validatorID], act as though they said their accepted frontier is empty
b.AcceptedFrontier(validatorID, requestID, ids.Set{})
return nil
}
@ -69,10 +87,16 @@ func (b *Bootstrapper) AcceptedFrontier(validatorID ids.ShortID, requestID uint3
b.Context.Log.Debug("Received an AcceptedFrontier message from %s unexpectedly", validatorID)
return nil
}
// Mark that we received a response from [validatorID]
b.pendingAcceptedFrontier.Remove(validatorID)
// Union the reported accepted frontier from [validatorID] with the accepted frontier we got from others
b.acceptedFrontier.Union(containerIDs)
// We've received the accepted frontier from every bootstrap validator
// Ask each bootstrap validator to filter the list of containers that we were
// told are on the accepted frontier such that the list only contains containers
// they think are accepted
if b.pendingAcceptedFrontier.Len() == 0 {
vdrs := ids.ShortSet{}
vdrs.Union(b.pendingAccepted)
@ -91,6 +115,8 @@ func (b *Bootstrapper) GetAccepted(validatorID ids.ShortID, requestID uint32, co
// GetAcceptedFailed implements the Engine interface.
func (b *Bootstrapper) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) error {
// If we can't get a response from [validatorID], act as though they said
// that they think none of the containers we sent them in GetAccepted are accepted
return b.Accepted(validatorID, requestID, ids.Set{})
}
@ -100,6 +126,7 @@ func (b *Bootstrapper) Accepted(validatorID ids.ShortID, requestID uint32, conta
b.Context.Log.Debug("Received an Accepted message from %s unexpectedly", validatorID)
return nil
}
// Mark that we received a response from [validatorID]
b.pendingAccepted.Remove(validatorID)
weight := uint64(0)
@ -121,6 +148,8 @@ func (b *Bootstrapper) Accepted(validatorID ids.ShortID, requestID uint32, conta
return nil
}
// We've received the filtered accepted frontier from every bootstrap validator
// Accept all containers that have a sufficient weight behind them
accepted := ids.Set{}
for key, weight := range b.acceptedVotes {
if weight >= b.Config.Alpha {

View File

@ -135,6 +135,21 @@ type FetchHandler interface {
// dropped.
Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
// Notify this engine of a request for a container and its ancestors.
// The request is from validator [validatorID]. The requested container is [containerID].
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID. It is also not safe to
// assume the requested containerID exists. However, the validatorID is
// assumed to be authenticated.
//
// This engine should respond with a MultiPut message with the same requestID,
// which contains [containerID] as well as its ancestors. See MultiPut's documentation.
//
// If this engine doesn't have some ancestors, it should reply with its best effort attempt at getting them.
// If this engine doesn't have [containerID] it can ignore this message.
GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
// Notify this engine of a container.
//
// This function can be called by any validator. It is not safe to assume
@ -152,6 +167,24 @@ type FetchHandler interface {
container []byte,
) error
// Notify this engine of multiple containers.
// Each element of [containers] is the byte representation of a container.
//
// This should only be called during bootstrapping, and in response to a GetAncestors message to
// [validatorID] with request ID [requestID]. This call should contain the container requested in
// that message, along with ancestors.
// The containers should be in BFS order (ie the first container must be the container
// requested in the GetAncestors message and further back ancestors are later in [containers]
//
// It is not safe to assume this message is in response to a GetAncestor message, that this
// message has a unique requestID or that any of the containers in [containers] are valid.
// However, the validatorID is assumed to be authenticated.
MultiPut(
validatorID ids.ShortID,
requestID uint32,
containers [][]byte,
) error
// Notify this engine that a get request it issued has failed.
//
// This function will be called if the engine sent a Get message that is not
@ -161,6 +194,16 @@ type FetchHandler interface {
// The validatorID and requestID are assumed to be the same as those sent in
// the Get message.
GetFailed(validatorID ids.ShortID, requestID uint32) error
// Notify this engine that a GetAncestors request it issued has failed.
//
// This function will be called if the engine sent a GetAncestors message that is not
// anticipated to be responded to. This could be because the recipient of
// the message is unknown or if the message request has timed out.
//
// The validatorID and requestID are assumed to be the same as those sent in
// the GetAncestors message.
GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) error
}
// QueryHandler defines how a consensus engine reacts to query messages from

View File

@ -50,9 +50,17 @@ type FetchSender interface {
// to this validator
Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
// GetAncestors requests that the validator with ID [validatorID] send container [containerID] and its
// ancestors. The maximum number of ancestors to send in response is defined in snow/engine/common/bootstrapper.go
GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
// Tell the specified validator that the container whose ID is <containerID>
// has body <container>
Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte)
// Give the specified validator several containers at once
// Should be in response to a GetAncestors message with request ID [requestID] from the validator
MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte)
}
// QuerySender defines how a consensus engine sends query messages to other

View File

@ -32,21 +32,26 @@ type EngineTest struct {
CantAccepted,
CantGet,
CantGetAncestors,
CantGetFailed,
CantGetAncestorsFailed,
CantPut,
CantMultiPut,
CantPushQuery,
CantPullQuery,
CantQueryFailed,
CantChits bool
ContextF func() *snow.Context
StartupF, GossipF, ShutdownF func() error
NotifyF func(Message) error
GetF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
PutF, PushQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) error
AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) error
GetAcceptedFrontierF, GetFailedF, QueryFailedF, GetAcceptedFrontierFailedF, GetAcceptedFailedF func(validatorID ids.ShortID, requestID uint32) error
ContextF func() *snow.Context
StartupF, GossipF, ShutdownF func() error
NotifyF func(Message) error
GetF, GetAncestorsF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
PutF, PushQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) error
MultiPutF func(validatorID ids.ShortID, requestID uint32, containers [][]byte) error
AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) error
GetAcceptedFrontierF, GetFailedF, GetAncestorsFailedF,
QueryFailedF, GetAcceptedFrontierFailedF, GetAcceptedFailedF func(validatorID ids.ShortID, requestID uint32) error
}
var _ Engine = &EngineTest{}
@ -70,8 +75,11 @@ func (e *EngineTest) Default(cant bool) {
e.CantAccepted = cant
e.CantGet = cant
e.CantGetAncestors = cant
e.CantGetAncestorsFailed = cant
e.CantGetFailed = cant
e.CantPut = cant
e.CantMultiPut = cant
e.CantPushQuery = cant
e.CantPullQuery = cant
@ -233,6 +241,16 @@ func (e *EngineTest) Get(validatorID ids.ShortID, requestID uint32, containerID
return nil
}
// GetAncestors ...
func (e *EngineTest) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error {
if e.GetAncestorsF != nil {
e.GetAncestorsF(validatorID, requestID, containerID)
} else if e.CantGetAncestors && e.T != nil {
e.T.Fatalf("Unexpectedly called GetAncestors")
}
return nil
}
// GetFailed ...
func (e *EngineTest) GetFailed(validatorID ids.ShortID, requestID uint32) error {
if e.GetFailedF != nil {
@ -246,6 +264,19 @@ func (e *EngineTest) GetFailed(validatorID ids.ShortID, requestID uint32) error
return nil
}
// GetAncestorsFailed ...
func (e *EngineTest) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) error {
if e.GetAncestorsFailedF != nil {
return e.GetAncestorsFailedF(validatorID, requestID)
} else if e.CantGetAncestorsFailed {
if e.T != nil {
e.T.Fatalf("Unexpectedly called GetAncestorsFailed")
}
return errors.New("Unexpectedly called GetAncestorsFailed")
}
return nil
}
// Put ...
func (e *EngineTest) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) error {
if e.PutF != nil {
@ -259,6 +290,19 @@ func (e *EngineTest) Put(validatorID ids.ShortID, requestID uint32, containerID
return nil
}
// MultiPut ...
func (e *EngineTest) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) error {
if e.MultiPutF != nil {
return e.MultiPutF(validatorID, requestID, containers)
} else if e.CantMultiPut {
if e.T != nil {
e.T.Fatalf("Unexpectedly called MultiPut")
}
return errors.New("Unexpectedly called MultiPut")
}
return nil
}
// PushQuery ...
func (e *EngineTest) PushQuery(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) error {
if e.PushQueryF != nil {

View File

@ -15,7 +15,7 @@ type SenderTest struct {
CantGetAcceptedFrontier, CantAcceptedFrontier,
CantGetAccepted, CantAccepted,
CantGet, CantPut,
CantGet, CantGetAncestors, CantPut, CantMultiPut,
CantPullQuery, CantPushQuery, CantChits,
CantGossip bool
@ -24,7 +24,9 @@ type SenderTest struct {
GetAcceptedF func(ids.ShortSet, uint32, ids.Set)
AcceptedF func(ids.ShortID, uint32, ids.Set)
GetF func(ids.ShortID, uint32, ids.ID)
GetAncestorsF func(ids.ShortID, uint32, ids.ID)
PutF func(ids.ShortID, uint32, ids.ID, []byte)
MultiPutF func(ids.ShortID, uint32, [][]byte)
PushQueryF func(ids.ShortSet, uint32, ids.ID, []byte)
PullQueryF func(ids.ShortSet, uint32, ids.ID)
ChitsF func(ids.ShortID, uint32, ids.Set)
@ -38,7 +40,9 @@ func (s *SenderTest) Default(cant bool) {
s.CantGetAccepted = cant
s.CantAccepted = cant
s.CantGet = cant
s.CantGetAccepted = cant
s.CantPut = cant
s.CantMultiPut = cant
s.CantPullQuery = cant
s.CantPushQuery = cant
s.CantChits = cant
@ -100,6 +104,17 @@ func (s *SenderTest) Get(vdr ids.ShortID, requestID uint32, vtxID ids.ID) {
}
}
// GetAncestors calls GetAncestorsF if it was initialized. If it
// wasn't initialized and this function shouldn't be called and testing was
// initialized, then testing will fail.
func (s *SenderTest) GetAncestors(validatorID ids.ShortID, requestID uint32, vtxID ids.ID) {
if s.GetAncestorsF != nil {
s.GetAncestorsF(validatorID, requestID, vtxID)
} else if s.CantGetAncestors && s.T != nil {
s.T.Fatalf("Unexpectedly called CantGetAncestors")
}
}
// Put calls PutF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
@ -111,6 +126,17 @@ func (s *SenderTest) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtx []
}
}
// MultiPut calls MultiPutF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
func (s *SenderTest) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte) {
if s.MultiPutF != nil {
s.MultiPutF(vdr, requestID, vtxs)
} else if s.CantMultiPut && s.T != nil {
s.T.Fatalf("Unexpectedly called MultiPut")
}
}
// PushQuery calls PushQueryF if it was initialized. If it wasn't initialized
// and this function shouldn't be called and testing was initialized, then
// testing will fail.

View File

@ -22,9 +22,6 @@ type BootstrapConfig struct {
// Blocked tracks operations that are blocked on blocks
Blocked *queue.Jobs
// blocks that have outstanding get requests
blkReqs common.Requests
VM ChainVM
Bootstrapped func()
@ -35,8 +32,19 @@ type bootstrapper struct {
metrics
common.Bootstrapper
pending ids.Set
finished bool
// true if all of the vertices in the original accepted frontier have been processed
processedStartingAcceptedFrontier bool
// Number of blocks processed
numProcessed uint32
// tracks which validators were asked for which containers in which requests
outstandingRequests common.Requests
// true if bootstrapping is done
finished bool
// Called when bootstrapping is done
onFinished func() error
}
@ -56,14 +64,14 @@ func (b *bootstrapper) Initialize(config BootstrapConfig) error {
return nil
}
// CurrentAcceptedFrontier ...
// CurrentAcceptedFrontier returns the last accepted block
func (b *bootstrapper) CurrentAcceptedFrontier() ids.Set {
acceptedFrontier := ids.Set{}
acceptedFrontier.Add(b.VM.LastAccepted())
return acceptedFrontier
}
// FilterAccepted ...
// FilterAccepted returns the blocks in [containerIDs] that we have accepted
func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
acceptedIDs := ids.Set{}
for _, blkID := range containerIDs.List() {
@ -82,105 +90,103 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
}
for _, blkID := range acceptedContainerIDs.List() {
if err := b.fetch(blkID); err != nil {
if blk, err := b.VM.GetBlock(blkID); err == nil {
if err := b.process(blk); err != nil {
return err
}
} else if err := b.fetch(blkID); err != nil {
return err
}
}
if numPending := b.pending.Len(); numPending == 0 {
// TODO: This typically indicates bootstrapping has failed, so this
// should be handled appropriately
b.processedStartingAcceptedFrontier = true
if numPending := b.outstandingRequests.Len(); numPending == 0 {
return b.finish()
}
return nil
}
// Put ...
func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) error {
b.BootstrapConfig.Context.Log.Verbo("Put called for blkID %s", blkID)
blk, err := b.VM.ParseBlock(blkBytes)
if err != nil {
b.BootstrapConfig.Context.Log.Debug("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
b.GetFailed(vdr, requestID)
return nil
}
if !b.pending.Contains(blk.ID()) {
b.BootstrapConfig.Context.Log.Debug("Validator %s sent an unrequested block:\n%s",
vdr,
formatting.DumpBytes{Bytes: blkBytes})
b.GetFailed(vdr, requestID)
return nil
}
return b.addBlock(blk)
}
// GetFailed ...
func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
blkID, ok := b.blkReqs.Remove(vdr, requestID)
if !ok {
b.BootstrapConfig.Context.Log.Debug("GetFailed called without sending the corresponding Get message from %s",
vdr)
return nil
}
b.sendRequest(blkID)
return nil
}
// Get block [blkID] and its ancestors from a validator
func (b *bootstrapper) fetch(blkID ids.ID) error {
if b.pending.Contains(blkID) {
// Make sure we haven't already requested this block
if b.outstandingRequests.Contains(blkID) {
return nil
}
blk, err := b.VM.GetBlock(blkID)
if err != nil {
b.sendRequest(blkID)
// Make sure we don't already have this block
if _, err := b.VM.GetBlock(blkID); err == nil {
return nil
}
return b.storeBlock(blk)
}
func (b *bootstrapper) sendRequest(blkID ids.ID) {
validators := b.BootstrapConfig.Validators.Sample(1)
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
if len(validators) == 0 {
b.BootstrapConfig.Context.Log.Error("Dropping request for %s as there are no validators", blkID)
return
return fmt.Errorf("Dropping request for %s as there are no validators", blkID)
}
validatorID := validators[0].ID()
b.RequestID++
b.blkReqs.RemoveAny(blkID)
b.blkReqs.Add(validatorID, b.RequestID, blkID)
b.pending.Add(blkID)
b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, blkID)
b.numPendingRequests.Set(float64(b.pending.Len()))
}
func (b *bootstrapper) addBlock(blk snowman.Block) error {
if err := b.storeBlock(blk); err != nil {
return err
}
if numPending := b.pending.Len(); numPending == 0 {
return b.finish()
}
b.outstandingRequests.Add(validatorID, b.RequestID, blkID)
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, blkID) // request block and ancestors
return nil
}
func (b *bootstrapper) storeBlock(blk snowman.Block) error {
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
// with request ID [requestID]
func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, blks [][]byte) error {
if lenBlks := len(blks); lenBlks > common.MaxContainersPerMultiPut {
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains more than maximum number of blocks", vdr, requestID)
return b.GetAncestorsFailed(vdr, requestID)
} else if lenBlks == 0 {
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains no blocks", vdr, requestID)
return b.GetAncestorsFailed(vdr, requestID)
}
// Make sure this is in response to a request we made
wantedBlkID, ok := b.outstandingRequests.Remove(vdr, requestID)
if !ok { // this message isn't in response to a request we made
b.BootstrapConfig.Context.Log.Debug("received unexpected MultiPut from %s with ID %d", vdr, requestID)
return nil
}
wantedBlk, err := b.VM.ParseBlock(blks[0]) // the block we requested
if err != nil {
b.BootstrapConfig.Context.Log.Debug("Failed to parse requested block %s: %w", wantedBlkID, err)
return b.fetch(wantedBlkID)
} else if actualID := wantedBlk.ID(); !actualID.Equals(wantedBlkID) {
b.BootstrapConfig.Context.Log.Debug("expected the first block to be the requested block, %s, but is %s", wantedBlk, actualID)
return b.fetch(wantedBlkID)
}
for _, blkBytes := range blks {
if _, err := b.VM.ParseBlock(blkBytes); err != nil { // persists the block
b.BootstrapConfig.Context.Log.Debug("Failed to parse block: %w", err)
b.BootstrapConfig.Context.Log.Verbo("block: %s", formatting.DumpBytes{Bytes: blkBytes})
}
}
return b.process(wantedBlk)
}
// GetAncestorsFailed is called when a GetAncestors message we sent fails
func (b *bootstrapper) GetAncestorsFailed(vdr ids.ShortID, requestID uint32) error {
blkID, ok := b.outstandingRequests.Remove(vdr, requestID)
if !ok {
b.BootstrapConfig.Context.Log.Debug("GetAncestorsFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
return nil
}
// Send another request for this
return b.fetch(blkID)
}
// process a block
func (b *bootstrapper) process(blk snowman.Block) error {
status := blk.Status()
blkID := blk.ID()
for status == choices.Processing {
b.pending.Remove(blkID)
b.numProcessed++ // Progress tracker
if b.numProcessed%common.StatusUpdateFrequency == 0 { // Periodically print progress
b.BootstrapConfig.Context.Log.Info("processed %d blocks", b.numProcessed)
}
if err := b.Blocked.Push(&blockJob{
numAccepted: b.numBootstrapped,
numDropped: b.numDropped,
@ -193,6 +199,7 @@ func (b *bootstrapper) storeBlock(blk snowman.Block) error {
return err
}
// Process this block's parent
blk = blk.Parent()
status = blk.Status()
blkID = blk.ID()
@ -200,15 +207,16 @@ func (b *bootstrapper) storeBlock(blk snowman.Block) error {
switch status := blk.Status(); status {
case choices.Unknown:
b.sendRequest(blkID)
case choices.Accepted:
b.BootstrapConfig.Context.Log.Verbo("Bootstrapping confirmed %s", blkID)
case choices.Rejected:
if err := b.fetch(blkID); err != nil {
return err
}
case choices.Rejected: // Should never happen
return fmt.Errorf("bootstrapping wants to accept %s, however it was previously rejected", blkID)
}
numPending := b.pending.Len()
b.numPendingRequests.Set(float64(numPending))
if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier {
return b.finish()
}
return nil
}

View File

@ -78,8 +78,9 @@ func newConfig(t *testing.T) (BootstrapConfig, ids.ShortID, *common.SenderTest,
}, peerID, sender, vm
}
// Single node in the accepted frontier; no need to fecth parent
func TestBootstrapperSingleFrontier(t *testing.T) {
config, peerID, sender, vm := newConfig(t)
config, _, _, vm := newConfig(t)
blkID0 := ids.Empty.Prefix(0)
blkID1 := ids.Empty.Prefix(1)
@ -104,6 +105,8 @@ func TestBootstrapperSingleFrontier(t *testing.T) {
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
acceptedIDs := ids.Set{}
acceptedIDs.Add(blkID1)
@ -111,61 +114,41 @@ func TestBootstrapperSingleFrontier(t *testing.T) {
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID1):
return nil, errUnknownBlock
return blk1, nil
case blkID.Equals(blkID0):
return blk0, nil
default:
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
reqID := new(uint32)
sender.GetF = func(vdr ids.ShortID, innerReqID uint32, blkID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case blkID.Equals(blkID1):
default:
t.Fatalf("Requested unknown vertex")
}
*reqID = innerReqID
}
vm.CantBootstrapping = false
bs.ForceAccepted(acceptedIDs)
vm.GetBlockF = nil
sender.GetF = nil
vm.CantBootstrapping = true
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes1):
return blk1, nil
case bytes.Equal(blkBytes, blkBytes0):
return blk0, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
vm.CantBootstrapping = false
vm.CantBootstrapped = false
bs.Put(peerID, *reqID, blkID1, blkBytes1)
vm.ParseBlockF = nil
bs.onFinished = nil
vm.CantBootstrapped = true
if !*finished {
if err := bs.ForceAccepted(acceptedIDs); err != nil { // should finish
t.Fatal(err)
} else if !*finished {
t.Fatalf("Bootstrapping should have finished")
}
if blk1.Status() != choices.Accepted {
} else if blk1.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
}
}
// Requests the unknown block and gets back a MultiPut with unexpected request ID.
// Requests again and gets response from unexpected peer.
// Requests again and gets an unexpected block.
// Requests again and gets the expected block.
func TestBootstrapperUnknownByzantineResponse(t *testing.T) {
config, peerID, sender, vm := newConfig(t)
@ -177,107 +160,6 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) {
blkBytes1 := []byte{1}
blkBytes2 := []byte{2}
blk0 := &Blk{
id: blkID0,
height: 0,
status: choices.Accepted,
bytes: blkBytes0,
}
blk1 := &Blk{
parent: blk0,
id: blkID1,
height: 1,
status: choices.Processing,
bytes: blkBytes1,
}
blk2 := &Blk{
parent: blk1,
id: blkID2,
height: 2,
status: choices.Processing,
bytes: blkBytes2,
}
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
acceptedIDs := ids.Set{}
acceptedIDs.Add(blkID1)
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID1):
return nil, errUnknownBlock
default:
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
requestID := new(uint32)
sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(blkID1):
default:
t.Fatalf("Requested unknown block")
}
*requestID = reqID
}
vm.CantBootstrapping = false
bs.ForceAccepted(acceptedIDs)
vm.GetBlockF = nil
vm.CantBootstrapping = true
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes1):
return blk1, nil
case bytes.Equal(blkBytes, blkBytes2):
return blk2, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
vm.CantBootstrapped = false
bs.Put(peerID, *requestID, blkID2, blkBytes2)
bs.Put(peerID, *requestID, blkID1, blkBytes1)
vm.ParseBlockF = nil
vm.CantBootstrapped = true
if !*finished {
t.Fatalf("Bootstrapping should have finished")
}
if blk1.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
}
if blk2.Status() != choices.Processing {
t.Fatalf("Block should be processing")
}
}
func TestBootstrapperDependency(t *testing.T) {
config, peerID, sender, vm := newConfig(t)
blkID0 := ids.Empty.Prefix(0)
blkID1 := ids.Empty.Prefix(1)
blkID2 := ids.Empty.Prefix(2)
blkBytes0 := []byte{0}
blkBytes1 := []byte{1}
blkBytes2 := []byte{2}
blk0 := &Blk{
id: blkID0,
height: 0,
@ -302,44 +184,36 @@ func TestBootstrapperDependency(t *testing.T) {
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
acceptedIDs := ids.Set{}
acceptedIDs.Add(blkID2)
parsedBlk1 := false
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID0):
return blk0, nil
case blkID.Equals(blkID1):
if parsedBlk1 {
return blk1, nil
}
return nil, errUnknownBlock
case blkID.Equals(blkID2):
return blk2, nil
default:
t.Fatalf("Requested unknown block")
panic("Requested unknown block")
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
requestID := new(uint32)
sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(blkID1):
default:
t.Fatalf("Requested unknown block")
}
*requestID = reqID
}
vm.CantBootstrapping = false
bs.ForceAccepted(acceptedIDs)
vm.GetBlockF = nil
sender.GetF = nil
vm.CantBootstrapping = true
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes0):
return blk0, nil
case bytes.Equal(blkBytes, blkBytes1):
blk1.status = choices.Processing
parsedBlk1 = true
return blk1, nil
case bytes.Equal(blkBytes, blkBytes2):
return blk2, nil
@ -348,21 +222,325 @@ func TestBootstrapperDependency(t *testing.T) {
return nil, errUnknownBlock
}
blk1.status = choices.Processing
requestID := new(uint32)
sender.GetAncestorsF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(blkID1):
default:
t.Fatalf("should have requested blk1")
}
*requestID = reqID
}
vm.CantBootstrapping = false
if err := bs.ForceAccepted(acceptedIDs); err != nil { // should request blk1
t.Fatal(err)
}
oldReqID := *requestID
if err := bs.MultiPut(peerID, *requestID+1, [][]byte{blkBytes1}); err != nil { // respond with wrong request ID
t.Fatal(err)
} else if oldReqID != *requestID {
t.Fatal("should not have sent new request")
}
if err := bs.MultiPut(ids.NewShortID([20]byte{1, 2, 3}), *requestID, [][]byte{blkBytes1}); err != nil { // respond from wrong peer
t.Fatal(err)
} else if oldReqID != *requestID {
t.Fatal("should not have sent new request")
}
if err := bs.MultiPut(peerID, *requestID, [][]byte{blkBytes0}); err != nil { // respond with wrong block
t.Fatal(err)
} else if oldReqID == *requestID {
t.Fatal("should have sent new request")
}
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
vm.CantBootstrapped = false
bs.Put(peerID, *requestID, blkID1, blkBytes1)
if err := bs.MultiPut(peerID, *requestID, [][]byte{blkBytes1}); err != nil { // respond with right block
t.Fatal(err)
} else if !*finished {
t.Fatalf("Bootstrapping should have finished")
} else if blk0.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
} else if blk1.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
} else if blk2.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
}
}
// There are multiple needed blocks and MultiPut returns one at a time
func TestBootstrapperPartialFetch(t *testing.T) {
config, peerID, sender, vm := newConfig(t)
blkID0 := ids.Empty.Prefix(0)
blkID1 := ids.Empty.Prefix(1)
blkID2 := ids.Empty.Prefix(2)
blkID3 := ids.Empty.Prefix(3)
blkBytes0 := []byte{0}
blkBytes1 := []byte{1}
blkBytes2 := []byte{2}
blkBytes3 := []byte{3}
blk0 := &Blk{
id: blkID0,
height: 0,
status: choices.Accepted,
bytes: blkBytes0,
}
blk1 := &Blk{
parent: blk0,
id: blkID1,
height: 1,
status: choices.Unknown,
bytes: blkBytes1,
}
blk2 := &Blk{
parent: blk1,
id: blkID2,
height: 2,
status: choices.Unknown,
bytes: blkBytes2,
}
blk3 := &Blk{
parent: blk2,
id: blkID3,
height: 3,
status: choices.Processing,
bytes: blkBytes3,
}
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
acceptedIDs := ids.Set{}
acceptedIDs.Add(blkID3)
parsedBlk1 := false
parsedBlk2 := false
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID0):
return blk0, nil
case blkID.Equals(blkID1):
if parsedBlk1 {
return blk1, nil
}
return nil, errUnknownBlock
case blkID.Equals(blkID2):
if parsedBlk2 {
return blk2, nil
}
return nil, errUnknownBlock
case blkID.Equals(blkID3):
return blk3, nil
default:
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes0):
return blk0, nil
case bytes.Equal(blkBytes, blkBytes1):
blk1.status = choices.Processing
parsedBlk1 = true
return blk1, nil
case bytes.Equal(blkBytes, blkBytes2):
blk2.status = choices.Processing
parsedBlk2 = true
return blk2, nil
case bytes.Equal(blkBytes, blkBytes3):
return blk3, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
requestID := new(uint32)
requested := ids.Empty
sender.GetAncestorsF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(blkID1), vtxID.Equals(blkID2):
default:
t.Fatalf("should have requested blk1 or blk2")
}
*requestID = reqID
requested = vtxID
}
vm.CantBootstrapping = false
if err := bs.ForceAccepted(acceptedIDs); err != nil { // should request blk2
t.Fatal(err)
}
if err := bs.MultiPut(peerID, *requestID, [][]byte{blkBytes2}); err != nil { // respond with blk2
t.Fatal(err)
} else if !requested.Equals(blkID1) {
t.Fatal("should have requested blk1")
}
vm.CantBootstrapped = false
if err := bs.MultiPut(peerID, *requestID, [][]byte{blkBytes1}); err != nil { // respond with blk1
t.Fatal(err)
} else if !requested.Equals(blkID1) {
t.Fatal("should not have requested another block")
}
if !*finished {
t.Fatalf("Bootstrapping should have finished")
}
if blk1.Status() != choices.Accepted {
} else if blk0.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
} else if blk1.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
} else if blk2.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
}
if blk2.Status() != choices.Accepted {
}
// There are multiple needed blocks and MultiPut returns all at once
func TestBootstrapperMultiPut(t *testing.T) {
config, peerID, sender, vm := newConfig(t)
blkID0 := ids.Empty.Prefix(0)
blkID1 := ids.Empty.Prefix(1)
blkID2 := ids.Empty.Prefix(2)
blkID3 := ids.Empty.Prefix(3)
blkBytes0 := []byte{0}
blkBytes1 := []byte{1}
blkBytes2 := []byte{2}
blkBytes3 := []byte{3}
blk0 := &Blk{
id: blkID0,
height: 0,
status: choices.Accepted,
bytes: blkBytes0,
}
blk1 := &Blk{
parent: blk0,
id: blkID1,
height: 1,
status: choices.Unknown,
bytes: blkBytes1,
}
blk2 := &Blk{
parent: blk1,
id: blkID2,
height: 2,
status: choices.Unknown,
bytes: blkBytes2,
}
blk3 := &Blk{
parent: blk2,
id: blkID3,
height: 3,
status: choices.Processing,
bytes: blkBytes3,
}
vm.CantBootstrapping = false
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
acceptedIDs := ids.Set{}
acceptedIDs.Add(blkID3)
parsedBlk1 := false
parsedBlk2 := false
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID0):
return blk0, nil
case blkID.Equals(blkID1):
if parsedBlk1 {
return blk1, nil
}
return nil, errUnknownBlock
case blkID.Equals(blkID2):
if parsedBlk2 {
return blk2, nil
}
return nil, errUnknownBlock
case blkID.Equals(blkID3):
return blk3, nil
default:
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes0):
return blk0, nil
case bytes.Equal(blkBytes, blkBytes1):
blk1.status = choices.Processing
parsedBlk1 = true
return blk1, nil
case bytes.Equal(blkBytes, blkBytes2):
blk2.status = choices.Processing
parsedBlk2 = true
return blk2, nil
case bytes.Equal(blkBytes, blkBytes3):
return blk3, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
requestID := new(uint32)
requested := ids.Empty
sender.GetAncestorsF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(blkID1), vtxID.Equals(blkID2):
default:
t.Fatalf("should have requested blk1 or blk2")
}
*requestID = reqID
requested = vtxID
}
if err := bs.ForceAccepted(acceptedIDs); err != nil { // should request blk2
t.Fatal(err)
}
vm.CantBootstrapped = false
if err := bs.MultiPut(peerID, *requestID, [][]byte{blkBytes2, blkBytes1}); err != nil { // respond with blk2 and blk1
t.Fatal(err)
} else if !requested.Equals(blkID2) {
t.Fatal("should not have requested another block")
}
if !*finished {
t.Fatalf("Bootstrapping should have finished")
} else if blk0.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
} else if blk1.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
} else if blk2.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
}
}
@ -444,169 +622,3 @@ func TestBootstrapperFilterAccepted(t *testing.T) {
t.Fatalf("Blk shouldn't be accepted")
}
}
func TestBootstrapperPartialFetch(t *testing.T) {
config, _, sender, vm := newConfig(t)
blkID0 := ids.Empty.Prefix(0)
blkID1 := ids.Empty.Prefix(1)
blkBytes0 := []byte{0}
blk0 := &Blk{
id: blkID0,
height: 0,
status: choices.Accepted,
bytes: blkBytes0,
}
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
acceptedIDs := ids.Set{}
acceptedIDs.Add(
blkID0,
blkID1,
)
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID0):
return blk0, nil
case blkID.Equals(blkID1):
return nil, errUnknownBlock
default:
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
sender.CantGet = false
bs.onFinished = func() error { return nil }
vm.CantBootstrapping = false
bs.ForceAccepted(acceptedIDs)
if bs.finished {
t.Fatalf("should have requested a block")
}
if bs.pending.Len() != 1 {
t.Fatalf("wrong number pending")
}
}
func TestBootstrapperWrongIDByzantineResponse(t *testing.T) {
config, peerID, sender, vm := newConfig(t)
blkID0 := ids.Empty.Prefix(0)
blkID1 := ids.Empty.Prefix(1)
blkID2 := ids.Empty.Prefix(2)
blkBytes0 := []byte{0}
blkBytes1 := []byte{1}
blkBytes2 := []byte{2}
blk0 := &Blk{
id: blkID0,
height: 0,
status: choices.Accepted,
bytes: blkBytes0,
}
blk1 := &Blk{
parent: blk0,
id: blkID1,
height: 1,
status: choices.Processing,
bytes: blkBytes1,
}
blk2 := &Blk{
parent: blk1,
id: blkID2,
height: 2,
status: choices.Processing,
bytes: blkBytes2,
}
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
acceptedIDs := ids.Set{}
acceptedIDs.Add(blkID1)
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID1):
return nil, errUnknownBlock
default:
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
requestID := new(uint32)
sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(blkID1):
default:
t.Fatalf("Requested unknown block")
}
*requestID = reqID
}
vm.CantBootstrapping = false
bs.ForceAccepted(acceptedIDs)
vm.GetBlockF = nil
sender.GetF = nil
vm.CantBootstrapping = true
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes2):
return blk2, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
sender.CantGet = false
bs.Put(peerID, *requestID, blkID1, blkBytes2)
sender.CantGet = true
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes1):
return blk1, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
finished := new(bool)
bs.onFinished = func() error { *finished = true; return nil }
vm.CantBootstrapped = false
bs.Put(peerID, *requestID, blkID1, blkBytes1)
vm.ParseBlockF = nil
vm.CantBootstrapped = true
if !*finished {
t.Fatalf("Bootstrapping should have finished")
}
if blk1.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
}
if blk2.Status() != choices.Processing {
t.Fatalf("Block should be processing")
}
}

View File

@ -4,7 +4,10 @@
package snowman
import (
"time"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/network"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/snowman"
@ -14,6 +17,12 @@ import (
"github.com/ava-labs/gecko/utils/wrappers"
)
const (
// TODO define this constant in one place rather than here and in snowman
// Max containers size in a MultiPut message
maxContainersLen = int(4 * network.DefaultMaxMessageSize / 5)
)
// Transitive implements the Engine interface by attempting to fetch all
// transitive dependencies.
type Transitive struct {
@ -44,7 +53,7 @@ type Transitive struct {
// Initialize implements the Engine interface
func (t *Transitive) Initialize(config Config) error {
config.Context.Log.Info("Initializing Snowman consensus")
config.Context.Log.Info("initializing consensus engine")
t.Config = config
t.metrics.Initialize(
@ -78,7 +87,7 @@ func (t *Transitive) finishBootstrapping() error {
// oracle block
tail, err := t.Config.VM.GetBlock(tailID)
if err != nil {
t.Config.Context.Log.Error("Failed to get last accepted block due to: %s", err)
t.Config.Context.Log.Error("failed to get last accepted block due to: %s", err)
return err
}
@ -96,7 +105,7 @@ func (t *Transitive) finishBootstrapping() error {
t.Config.VM.SetPreference(tailID)
}
t.Config.Context.Log.Info("Bootstrapping finished with %s as the last accepted block", tailID)
t.Config.Context.Log.Info("bootstrapping finished with %s as the last accepted block", tailID)
return nil
}
@ -105,18 +114,18 @@ func (t *Transitive) Gossip() error {
blkID := t.Config.VM.LastAccepted()
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil {
t.Config.Context.Log.Warn("Dropping gossip request as %s couldn't be loaded due to %s", blkID, err)
t.Config.Context.Log.Warn("dropping gossip request as %s couldn't be loaded due to %s", blkID, err)
return nil
}
t.Config.Context.Log.Debug("Gossiping %s as accepted to the network", blkID)
t.Config.Context.Log.Verbo("gossiping %s as accepted to the network", blkID)
t.Config.Sender.Gossip(blkID, blk.Bytes())
return nil
}
// Shutdown implements the Engine interface
func (t *Transitive) Shutdown() error {
t.Config.Context.Log.Info("Shutting down Snowman consensus")
t.Config.Context.Log.Info("shutting down consensus engine")
return t.Config.VM.Shutdown()
}
@ -130,9 +139,7 @@ func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, blkID ids.ID) error
// If we failed to get the block, that means either an unexpected error
// has occurred, the validator is not following the protocol, or the
// block has been pruned.
t.Config.Context.Log.Warn("Get called for blockID %s errored with %s",
blkID,
err)
t.Config.Context.Log.Debug("Get(%s, %d, %s) failed with: %s", vdr, requestID, blkID, err)
return nil
}
@ -141,22 +148,51 @@ func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, blkID ids.ID) error
return nil
}
// GetAncestors implements the Engine interface
func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, blkID ids.ID) error {
startTime := time.Now()
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil { // Don't have the block. Drop this request.
t.Config.Context.Log.Verbo("couldn't get block %s. dropping GetAncestors(%s, %d, %s)", blkID, vdr, requestID, blkID)
return nil
}
ancestorsBytes := make([][]byte, 1, common.MaxContainersPerMultiPut) // First elt is byte repr. of blk, then its parents, then grandparent, etc.
ancestorsBytes[0] = blk.Bytes()
ancestorsBytesLen := len(blk.Bytes()) + wrappers.IntLen // length, in bytes, of all elements of ancestors
for numFetched := 1; numFetched < common.MaxContainersPerMultiPut && time.Since(startTime) < common.MaxTimeFetchingAncestors; numFetched++ {
blk = blk.Parent()
if blk.Status() == choices.Unknown {
break
}
blkBytes := blk.Bytes()
// Ensure response size isn't too large. Include wrappers.IntLen because the size of the message
// is included with each container, and the size is repr. by an int.
if newLen := wrappers.IntLen + ancestorsBytesLen + len(blkBytes); newLen < maxContainersLen {
ancestorsBytes = append(ancestorsBytes, blkBytes)
ancestorsBytesLen = newLen
} else { // reached maximum response size
break
}
}
t.Config.Sender.MultiPut(vdr, requestID, ancestorsBytes)
return nil
}
// Put implements the Engine interface
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) error {
t.Config.Context.Log.Verbo("Put called for blockID %s", blkID)
// if the engine hasn't been bootstrapped, forward the request to the
// bootstrapper
// bootstrapping isn't done --> we didn't send any gets --> this put is invalid
if !t.bootstrapped {
return t.bootstrapper.Put(vdr, requestID, blkID, blkBytes)
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID)
return nil
}
blk, err := t.Config.VM.ParseBlock(blkBytes)
if err != nil {
t.Config.Context.Log.Debug("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
t.Config.Context.Log.Debug("failed to parse block %s: %s", blkID, err)
t.Config.Context.Log.Verbo("block:\n%s", formatting.DumpBytes{Bytes: blkBytes})
// because GetFailed doesn't utilize the assumption that we actually
// sent a Get message, we can safely call GetFailed here to potentially
// abandon the request.
@ -174,10 +210,10 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkByt
// GetFailed implements the Engine interface
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
// if the engine hasn't been bootstrapped, forward the request to the
// bootstrapper
// not done bootstrapping --> didn't send a get --> this message is invalid
if !t.bootstrapped {
return t.bootstrapper.GetFailed(vdr, requestID)
t.Config.Context.Log.Debug("dropping GetFailed(%s, %d) due to bootstrapping")
return nil
}
// we don't use the assumption that this function is called after a failed
@ -185,8 +221,7 @@ func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
// and also get what the request was for if it exists
blkID, ok := t.blkReqs.Remove(vdr, requestID)
if !ok {
t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s",
vdr)
t.Config.Context.Log.Debug("getFailed(%s, %d) called without having sent corresponding Get", vdr, requestID)
return nil
}
@ -201,8 +236,7 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID)
// if the engine hasn't been bootstrapped, we aren't ready to respond to
// queries
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping",
blkID)
t.Config.Context.Log.Debug("dropping PullQuery(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID)
return nil
}
@ -234,16 +268,15 @@ func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID,
// if the engine hasn't been bootstrapped, we aren't ready to respond to
// queries
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", blkID)
t.Config.Context.Log.Debug("dropping PushQuery(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID)
return nil
}
blk, err := t.Config.VM.ParseBlock(blkBytes)
// If the parsing fails, we just drop the request, as we didn't ask for it
if err != nil {
t.Config.Context.Log.Warn("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
t.Config.Context.Log.Debug("failed to parse block %s: %s", blkID, err)
t.Config.Context.Log.Verbo("block:\n%s", formatting.DumpBytes{Bytes: blkBytes})
return nil
}
@ -264,17 +297,13 @@ func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID,
func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) error {
// if the engine hasn't been bootstrapped, we shouldn't be receiving chits
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping Chits due to bootstrapping")
t.Config.Context.Log.Debug("dropping Chits(%s, %d) due to bootstrapping", vdr, requestID)
return nil
}
// Since this is snowman, there should only be one ID in the vote set
if votes.Len() != 1 {
t.Config.Context.Log.Debug("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d",
votes.Len(),
vdr,
requestID)
t.Config.Context.Log.Debug("Chits(%s, %d) was called with %d votes (expected 1)", vdr, requestID, votes.Len())
// because QueryFailed doesn't utilize the assumption that we actually
// sent a Query message, we can safely call QueryFailed here to
// potentially abandon the request.
@ -282,7 +311,7 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) err
}
vote := votes.List()[0]
t.Config.Context.Log.Verbo("Chit was called. RequestID: %v. Vote: %s", requestID, vote)
t.Config.Context.Log.Verbo("Chits(%s, %d) contains vote for %s", vdr, requestID, vote)
v := &voter{
t: t,
@ -310,7 +339,7 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) err
func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) error {
// if the engine hasn't been bootstrapped, we won't have sent a query
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping QueryFailed due to bootstrapping")
t.Config.Context.Log.Warn("dropping QueryFailed(%s, %d) due to bootstrapping", vdr, requestID)
return nil
}
@ -326,24 +355,24 @@ func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) error {
func (t *Transitive) Notify(msg common.Message) error {
// if the engine hasn't been bootstrapped, we shouldn't issuing blocks
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping Notify due to bootstrapping")
t.Config.Context.Log.Debug("dropping Notify due to bootstrapping")
return nil
}
t.Config.Context.Log.Verbo("Snowman engine notified of %s from the vm", msg)
t.Config.Context.Log.Verbo("snowman engine notified of %s from the vm", msg)
switch msg {
case common.PendingTxs:
// the pending txs message means we should attempt to build a block.
blk, err := t.Config.VM.BuildBlock()
if err != nil {
t.Config.Context.Log.Verbo("VM.BuildBlock errored with %s", err)
t.Config.Context.Log.Debug("VM.BuildBlock errored with: %s", err)
return nil
}
// a newly created block is expected to be processing. If this check
// fails, there is potentially an error in the VM this engine is running
if status := blk.Status(); status != choices.Processing {
t.Config.Context.Log.Warn("Attempting to issue a block with status: %s, expected Processing", status)
t.Config.Context.Log.Warn("attempting to issue a block with status: %s, expected Processing", status)
}
// the newly created block should be built on top of the preferred
@ -351,7 +380,7 @@ func (t *Transitive) Notify(msg common.Message) error {
// confirmed.
parentID := blk.Parent().ID()
if pref := t.Consensus.Preference(); !parentID.Equals(pref) {
t.Config.Context.Log.Warn("Built block with parent: %s, expected %s", parentID, pref)
t.Config.Context.Log.Warn("built block with parent: %s, expected %s", parentID, pref)
}
added, err := t.insertAll(blk)
@ -361,12 +390,12 @@ func (t *Transitive) Notify(msg common.Message) error {
// inserting the block shouldn't have any missing dependencies
if added {
t.Config.Context.Log.Verbo("Successfully issued new block from the VM")
t.Config.Context.Log.Verbo("successfully issued new block from the VM")
} else {
t.Config.Context.Log.Warn("VM.BuildBlock returned a block that is pending for ancestors")
}
default:
t.Config.Context.Log.Warn("Unexpected message from the VM: %s", msg)
t.Config.Context.Log.Warn("unexpected message from the VM: %s", msg)
}
return nil
}
@ -476,7 +505,7 @@ func (t *Transitive) insert(blk snowman.Block) error {
// block on the parent if needed
if parent := blk.Parent(); !t.Consensus.Issued(parent) {
parentID := parent.ID()
t.Config.Context.Log.Verbo("Block waiting for parent %s", parentID)
t.Config.Context.Log.Verbo("block %s waiting for parent %s", blkID, parentID)
i.deps.Add(parentID)
}
@ -494,10 +523,9 @@ func (t *Transitive) sendRequest(vdr ids.ShortID, blkID ids.ID) {
return
}
t.Config.Context.Log.Verbo("Sending Get message for %s", blkID)
t.RequestID++
t.blkReqs.Add(vdr, t.RequestID, blkID)
t.Config.Context.Log.Verbo("sending Get(%s, %d, %s)", vdr, t.RequestID, blkID)
t.Config.Sender.Get(vdr, t.RequestID, blkID)
// Tracks performance statistics
@ -506,7 +534,7 @@ func (t *Transitive) sendRequest(vdr ids.ShortID, blkID ids.ID) {
// send a pull request for this block ID
func (t *Transitive) pullSample(blkID ids.ID) {
t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators)
t.Config.Context.Log.Verbo("about to sample from: %s", t.Config.Validators)
p := t.Consensus.Parameters()
vdrs := t.Config.Validators.Sample(p.K)
vdrSet := ids.ShortSet{}
@ -515,13 +543,13 @@ func (t *Transitive) pullSample(blkID ids.ID) {
}
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID)
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID)
return
}
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID)
t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID)
return
}
@ -530,7 +558,7 @@ func (t *Transitive) pullSample(blkID ids.ID) {
// send a push request for this block
func (t *Transitive) pushSample(blk snowman.Block) {
t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators)
t.Config.Context.Log.Verbo("about to sample from: %s", t.Config.Validators)
p := t.Consensus.Parameters()
vdrs := t.Config.Validators.Sample(p.K)
vdrSet := ids.ShortSet{}
@ -540,13 +568,13 @@ func (t *Transitive) pushSample(blk snowman.Block) {
blkID := blk.ID()
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID)
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID)
return
}
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID)
t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID)
return
}
@ -564,7 +592,7 @@ func (t *Transitive) deliver(blk snowman.Block) error {
t.pending.Remove(blkID)
if err := blk.Verify(); err != nil {
t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err)
t.Config.Context.Log.Debug("block failed verification due to %s, dropping block", err)
// if verify fails, then all decedents are also invalid
t.blocked.Abandon(blkID)
@ -572,7 +600,7 @@ func (t *Transitive) deliver(blk snowman.Block) error {
return t.errs.Err
}
t.Config.Context.Log.Verbo("Adding block to consensus: %s", blkID)
t.Config.Context.Log.Verbo("adding block to consensus: %s", blkID)
t.Consensus.Add(blk)
// Add all the oracle blocks if they exist. We call verify on all the blocks
@ -584,7 +612,7 @@ func (t *Transitive) deliver(blk snowman.Block) error {
case OracleBlock:
for _, blk := range blk.Options() {
if err := blk.Verify(); err != nil {
t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err)
t.Config.Context.Log.Debug("block failed verification due to %s, dropping block", err)
dropped = append(dropped, blk)
} else {
t.Consensus.Add(blk)

View File

@ -116,6 +116,15 @@ func (h *Handler) dispatchMsg(msg message) bool {
case getAcceptedFailedMsg:
err = h.engine.GetAcceptedFailed(msg.validatorID, msg.requestID)
h.getAcceptedFailed.Observe(float64(time.Now().Sub(startTime)))
case getAncestorsMsg:
err = h.engine.GetAncestors(msg.validatorID, msg.requestID, msg.containerID)
h.getAncestors.Observe(float64(time.Now().Sub(startTime)))
case getAncestorsFailedMsg:
err = h.engine.GetAncestorsFailed(msg.validatorID, msg.requestID)
h.getAncestorsFailed.Observe(float64(time.Now().Sub(startTime)))
case multiPutMsg:
err = h.engine.MultiPut(msg.validatorID, msg.requestID, msg.containers)
h.multiPut.Observe(float64(time.Now().Sub(startTime)))
case getMsg:
err = h.engine.Get(msg.validatorID, msg.requestID, msg.containerID)
h.get.Observe(float64(time.Now().Sub(startTime)))
@ -235,6 +244,16 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids
}
}
// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
h.msgs <- message{
messageType: getAncestorsMsg,
validatorID: validatorID,
requestID: requestID,
containerID: containerID,
}
}
// Put passes a Put message received from the network to the consensus engine.
func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) {
h.metrics.pending.Inc()
@ -247,6 +266,16 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids
}
}
// MultiPut passes a MultiPut message received from the network to the consensus engine.
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) {
h.msgs <- message{
messageType: multiPutMsg,
validatorID: validatorID,
requestID: requestID,
containers: containers,
}
}
// GetFailed passes a GetFailed message to the consensus engine.
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
h.metrics.pending.Inc()
@ -257,6 +286,15 @@ func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
}
}
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) {
h.msgs <- message{
messageType: getAncestorsFailedMsg,
validatorID: validatorID,
requestID: requestID,
}
}
// PushQuery passes a PushQuery message received from the network to the consensus engine.
func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID, block []byte) {
h.metrics.pending.Inc()

View File

@ -31,6 +31,9 @@ const (
notifyMsg
gossipMsg
shutdownMsg
getAncestorsMsg
multiPutMsg
getAncestorsFailedMsg
)
type message struct {
@ -39,6 +42,7 @@ type message struct {
requestID uint32
containerID ids.ID
container []byte
containers [][]byte
containerIDs ids.Set
notification common.Message
}
@ -74,8 +78,12 @@ func (t msgType) String() string {
return "Get Accepted Failed Message"
case getMsg:
return "Get Message"
case getAncestorsMsg:
return "Get Ancestors Message"
case putMsg:
return "Put Message"
case multiPutMsg:
return "MultiPut Message"
case getFailedMsg:
return "Get Failed Message"
case pushQueryMsg:

View File

@ -32,6 +32,7 @@ type metrics struct {
getAcceptedFrontier, acceptedFrontier, getAcceptedFrontierFailed,
getAccepted, accepted, getAcceptedFailed,
getAncestors, multiPut, getAncestorsFailed,
get, put, getFailed,
pushQuery, pullQuery, chits, queryFailed,
notify,
@ -60,6 +61,9 @@ func (m *metrics) Initialize(namespace string, registerer prometheus.Registerer)
m.getAccepted = initHistogram(namespace, "get_accepted", registerer, &errs)
m.accepted = initHistogram(namespace, "accepted", registerer, &errs)
m.getAcceptedFailed = initHistogram(namespace, "get_accepted_failed", registerer, &errs)
m.getAncestors = initHistogram(namespace, "get_ancestors", registerer, &errs)
m.multiPut = initHistogram(namespace, "multi_put", registerer, &errs)
m.getAncestorsFailed = initHistogram(namespace, "get_ancestors_failed", registerer, &errs)
m.get = initHistogram(namespace, "get", registerer, &errs)
m.put = initHistogram(namespace, "put", registerer, &errs)
m.getFailed = initHistogram(namespace, "get_failed", registerer, &errs)

View File

@ -36,7 +36,9 @@ type ExternalRouter interface {
GetAccepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set)
Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set)
Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte)
PushQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
PullQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set)
@ -47,5 +49,6 @@ type InternalRouter interface {
GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
}

View File

@ -186,6 +186,20 @@ func (sr *ChainRouter) Get(validatorID ids.ShortID, chainID ids.ID, requestID ui
}
}
// GetAncestors routes an incoming GetAncestors message from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
// The maximum number of ancestors to respond with is define in snow/engine/commong/bootstrapper.go
func (sr *ChainRouter) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
sr.lock.RLock()
defer sr.lock.RUnlock()
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.GetAncestors(validatorID, requestID, containerID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
}
}
// Put routes an incoming Put request from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
@ -202,6 +216,22 @@ func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID ui
}
}
// MultiPut routes an incoming MultiPut message from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) {
sr.lock.RLock()
defer sr.lock.RUnlock()
// This message came in response to a GetAncestors message from this node, and when we sent that
// message we set a timeout. Since we got a response, cancel the timeout.
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.MultiPut(validatorID, requestID, containers)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
}
}
// GetFailed routes an incoming GetFailed message from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
@ -216,6 +246,20 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques
}
}
// GetAncestorsFailed routes an incoming GetAncestorsFailed message from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.GetAncestorsFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
}
}
// PushQuery routes an incoming PushQuery request from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) PushQuery(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {

View File

@ -15,7 +15,10 @@ type ExternalSender interface {
Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set)
Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte)
PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID)

View File

@ -93,6 +93,20 @@ func (s *Sender) Get(validatorID ids.ShortID, requestID uint32, containerID ids.
s.sender.Get(validatorID, s.ctx.ChainID, requestID, containerID)
}
// GetAncestors sends a GetAncestors message
func (s *Sender) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
s.ctx.Log.Verbo("Sending GetAncestors to validator %s. RequestID: %d. ContainerID: %s", validatorID, requestID, containerID)
// Sending a GetAncestors to myself will always fail
if validatorID.Equals(s.ctx.NodeID) {
go s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID)
return
}
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID)
})
s.sender.GetAncestors(validatorID, s.ctx.ChainID, requestID, containerID)
}
// Put sends a Put message to the consensus engine running on the specified chain
// on the specified validator.
// The Put message signifies that this consensus engine is giving to the recipient
@ -102,6 +116,14 @@ func (s *Sender) Put(validatorID ids.ShortID, requestID uint32, containerID ids.
s.sender.Put(validatorID, s.ctx.ChainID, requestID, containerID, container)
}
// MultiPut sends a MultiPut message to the consensus engine running on the specified chain
// on the specified validator.
// The MultiPut message gives the recipient the contents of several containers.
func (s *Sender) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) {
s.ctx.Log.Verbo("Sending MultiPut to validator %s. RequestID: %d. NumContainers: %d", validatorID, requestID, len(containers))
s.sender.MultiPut(validatorID, s.ctx.ChainID, requestID, containers)
}
// PushQuery sends a PushQuery message to the consensus engines running on the specified chains
// on the specified validators.
// The PushQuery message signifies that this consensus engine would like each validator to send

View File

@ -16,7 +16,7 @@ type ExternalSenderTest struct {
CantGetAcceptedFrontier, CantAcceptedFrontier,
CantGetAccepted, CantAccepted,
CantGet, CantPut,
CantGet, CantGetAncestors, CantPut, CantMultiPut,
CantPullQuery, CantPushQuery, CantChits,
CantGossip bool
@ -24,8 +24,9 @@ type ExternalSenderTest struct {
AcceptedFrontierF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set)
GetAcceptedF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set)
AcceptedF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set)
GetF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
GetF, GetAncestorsF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
PutF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
MultiPutF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte)
PushQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
PullQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID)
ChitsF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set)
@ -39,7 +40,9 @@ func (s *ExternalSenderTest) Default(cant bool) {
s.CantGetAccepted = cant
s.CantAccepted = cant
s.CantGet = cant
s.CantGetAncestors = cant
s.CantPut = cant
s.CantMultiPut = cant
s.CantPullQuery = cant
s.CantPushQuery = cant
s.CantChits = cant
@ -111,6 +114,19 @@ func (s *ExternalSenderTest) Get(vdr ids.ShortID, chainID ids.ID, requestID uint
}
}
// GetAncestors calls GetAncestorsF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
func (s *ExternalSenderTest) GetAncestors(vdr ids.ShortID, chainID ids.ID, requestID uint32, vtxID ids.ID) {
if s.GetAncestorsF != nil {
s.GetAncestorsF(vdr, chainID, requestID, vtxID)
} else if s.CantGetAncestors && s.T != nil {
s.T.Fatalf("Unexpectedly called GetAncestors")
} else if s.CantGetAncestors && s.B != nil {
s.B.Fatalf("Unexpectedly called GetAncestors")
}
}
// Put calls PutF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
@ -124,6 +140,19 @@ func (s *ExternalSenderTest) Put(vdr ids.ShortID, chainID ids.ID, requestID uint
}
}
// MultiPut calls MultiPutF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
func (s *ExternalSenderTest) MultiPut(vdr ids.ShortID, chainID ids.ID, requestID uint32, vtxs [][]byte) {
if s.MultiPutF != nil {
s.MultiPutF(vdr, chainID, requestID, vtxs)
} else if s.CantMultiPut && s.T != nil {
s.T.Fatalf("Unexpectedly called MultiPut")
} else if s.CantMultiPut && s.B != nil {
s.B.Fatalf("Unexpectedly called MultiPut")
}
}
// PushQuery calls PushQueryF if it was initialized. If it wasn't initialized
// and this function shouldn't be called and testing was initialized, then
// testing will fail.

View File

@ -256,6 +256,24 @@ func (p *Packer) UnpackFixedByteSlices(size int) [][]byte {
return bytes
}
// Pack2DByteSlice append a 2D byte slice to the byte array
func (p *Packer) Pack2DByteSlice(byteSlices [][]byte) {
p.PackInt(uint32(len(byteSlices)))
for _, bytes := range byteSlices {
p.PackBytes(bytes)
}
}
// Unpack2DByteSlice returns a 2D byte slice from the byte array.
func (p *Packer) Unpack2DByteSlice() [][]byte {
sliceSize := p.UnpackInt()
bytes := [][]byte(nil)
for i := uint32(0); i < sliceSize && !p.Errored(); i++ {
bytes = append(bytes, p.UnpackBytes())
}
return bytes
}
// PackStr append a string to the byte array
func (p *Packer) PackStr(str string) {
strSize := len(str)
@ -432,6 +450,20 @@ func TryUnpackBytes(packer *Packer) interface{} {
return packer.UnpackBytes()
}
// TryPack2DBytes attempts to pack the value as a 2D byte slice
func TryPack2DBytes(packer *Packer, valIntf interface{}) {
if val, ok := valIntf.([][]byte); ok {
packer.Pack2DByteSlice(val)
} else {
packer.Add(errBadType)
}
}
// TryUnpack2DBytes attempts to unpack the value as a 2D byte slice
func TryUnpack2DBytes(packer *Packer) interface{} {
return packer.Unpack2DByteSlice()
}
// TryPackStr attempts to pack the value as a string
func TryPackStr(packer *Packer, valIntf interface{}) {
if val, ok := valIntf.(string); ok {

View File

@ -506,3 +506,63 @@ func TestPackerUnpackBool(t *testing.T) {
t.Fatalf("Packer.UnpackBool returned %t, expected sentinal value %t", actual, BoolSentinal)
}
}
func TestPacker2DByteSlice(t *testing.T) {
// Case: empty array
p := Packer{MaxSize: 1024}
arr := [][]byte{}
p.Pack2DByteSlice(arr)
if p.Errored() {
t.Fatal(p.Err)
}
arrUnpacked := p.Unpack2DByteSlice()
if len(arrUnpacked) != 0 {
t.Fatal("should be empty")
}
// Case: Array has one element
p = Packer{MaxSize: 1024}
arr = [][]byte{
[]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
}
p.Pack2DByteSlice(arr)
if p.Errored() {
t.Fatal(p.Err)
}
p = Packer{MaxSize: 1024, Bytes: p.Bytes}
arrUnpacked = p.Unpack2DByteSlice()
if p.Errored() {
t.Fatal(p.Err)
}
if l := len(arrUnpacked); l != 1 {
t.Fatalf("should be length 1 but is length %d", l)
}
if !bytes.Equal(arrUnpacked[0], arr[0]) {
t.Fatal("should match")
}
// Case: Array has multiple elements
p = Packer{MaxSize: 1024}
arr = [][]byte{
[]byte{1, 2, 3, 4, 5, 6, 7, 8, 9, 10},
[]byte{11, 12, 3, 4, 5, 6, 7, 8, 9, 10},
}
p.Pack2DByteSlice(arr)
if p.Errored() {
t.Fatal(p.Err)
}
p = Packer{MaxSize: 1024, Bytes: p.Bytes}
arrUnpacked = p.Unpack2DByteSlice()
if p.Errored() {
t.Fatal(p.Err)
}
if l := len(arrUnpacked); l != 2 {
t.Fatalf("should be length 1 but is length %d", l)
}
if !bytes.Equal(arrUnpacked[0], arr[0]) {
t.Fatal("should match")
}
if !bytes.Equal(arrUnpacked[1], arr[1]) {
t.Fatal("should match")
}
}

View File

@ -1551,8 +1551,9 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
advanceTimePreference := advanceTimeBlk.Options()[0]
peerID := ids.NewShortID([20]byte{1, 2, 3, 4, 5, 4, 3, 2, 1})
vdrs := validators.NewSet()
vdrs.Add(validators.NewValidator(ctx.NodeID, 1))
vdrs.Add(validators.NewValidator(peerID, 1))
beacons := vdrs
timeoutManager := timeout.Manager{}
@ -1623,23 +1624,23 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
frontier := ids.Set{}
frontier.Add(advanceTimeBlkID)
engine.AcceptedFrontier(ctx.NodeID, *reqID, frontier)
engine.AcceptedFrontier(peerID, *reqID, frontier)
externalSender.GetAcceptedF = nil
externalSender.GetF = func(_ ids.ShortID, _ ids.ID, requestID uint32, containerID ids.ID) {
externalSender.GetAncestorsF = func(_ ids.ShortID, _ ids.ID, requestID uint32, containerID ids.ID) {
*reqID = requestID
if !containerID.Equals(advanceTimeBlkID) {
t.Fatalf("wrong block requested")
}
}
engine.Accepted(ctx.NodeID, *reqID, frontier)
engine.Accepted(peerID, *reqID, frontier)
externalSender.GetF = nil
externalSender.CantPushQuery = false
externalSender.CantPullQuery = false
engine.Put(ctx.NodeID, *reqID, advanceTimeBlkID, advanceTimeBlkBytes)
engine.MultiPut(peerID, *reqID, [][]byte{advanceTimeBlkBytes})
externalSender.CantPushQuery = true