mirror of https://github.com/poanetwork/gecko.git
address (almost all) PR comments...still need to do final pass
This commit is contained in:
parent
8f5a6b1db2
commit
654f1c103b
|
@ -171,15 +171,14 @@ const (
|
||||||
AcceptedFrontier
|
AcceptedFrontier
|
||||||
GetAccepted
|
GetAccepted
|
||||||
Accepted
|
Accepted
|
||||||
|
GetAncestors
|
||||||
|
MultiPut
|
||||||
// Consensus:
|
// Consensus:
|
||||||
Get
|
Get
|
||||||
Put
|
Put
|
||||||
PushQuery
|
PushQuery
|
||||||
PullQuery
|
PullQuery
|
||||||
Chits
|
Chits
|
||||||
// Bootstrapping
|
|
||||||
GetAncestors
|
|
||||||
MultiPut
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Defines the messages that can be sent/received with this network
|
// Defines the messages that can be sent/received with this network
|
||||||
|
|
|
@ -375,7 +375,10 @@ func (n *network) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestI
|
||||||
sent = peer.send(msg)
|
sent = peer.send(msg)
|
||||||
}
|
}
|
||||||
if !sent {
|
if !sent {
|
||||||
|
n.getAncestors.numFailed.Inc()
|
||||||
n.log.Debug("failed to send a GetAncestors message to: %s", validatorID)
|
n.log.Debug("failed to send a GetAncestors message to: %s", validatorID)
|
||||||
|
} else {
|
||||||
|
n.getAncestors.numSent.Inc()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -5,7 +5,6 @@ package avalanche
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
|
|
||||||
"github.com/ava-labs/gecko/cache"
|
"github.com/ava-labs/gecko/cache"
|
||||||
"github.com/ava-labs/gecko/ids"
|
"github.com/ava-labs/gecko/ids"
|
||||||
|
@ -96,8 +95,13 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
|
||||||
return acceptedVtxIDs
|
return acceptedVtxIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a vertex and its ancestors
|
// Get vertex [vtxID] and its ancestors
|
||||||
func (b *bootstrapper) fetch(vtxID ids.ID) error {
|
func (b *bootstrapper) fetch(vtxID ids.ID) error {
|
||||||
|
// Make sure we haven't already requested this block
|
||||||
|
if b.outstandingRequests.Contains(vtxID) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure we don't already have this vertex
|
// Make sure we don't already have this vertex
|
||||||
if _, err := b.State.GetVertex(vtxID); err == nil {
|
if _, err := b.State.GetVertex(vtxID); err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -169,7 +173,7 @@ func (b *bootstrapper) process(vtx avalanche.Vertex) error {
|
||||||
for _, parent := range vtx.Parents() {
|
for _, parent := range vtx.Parents() {
|
||||||
toProcess = append(toProcess, parent)
|
toProcess = append(toProcess, parent)
|
||||||
}
|
}
|
||||||
b.processedCache.Put(vtx.ID(), vtx.ID())
|
b.processedCache.Put(vtx.ID(), nil)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if numPending := b.outstandingRequests.Len(); numPending == 0 {
|
if numPending := b.outstandingRequests.Len(); numPending == 0 {
|
||||||
|
@ -178,90 +182,52 @@ func (b *bootstrapper) process(vtx avalanche.Vertex) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put handles the receipt of a vertex and processes it
|
|
||||||
func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
|
|
||||||
vtx, err := b.State.ParseVertex(vtxBytes) // Persists the vtx. vtx.Status() not Unknown.
|
|
||||||
if err != nil {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("Failed to parse vertex: %w", err)
|
|
||||||
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
|
||||||
parsedVtxID := vtx.ID() // Actual ID of the vertex we just got
|
|
||||||
|
|
||||||
// The validator that sent this message said the ID of the vertex inside was [vtxID]
|
|
||||||
// but actually it's [parsedVtxID]
|
|
||||||
if !parsedVtxID.Equals(vtxID) {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("expected Put from %s to contain %s but contains %s. Request ID: %d", vdr, vtxID, parsedVtxID, requestID)
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedVtxID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
|
||||||
if !ok { // there was no outstanding request from this validator for a request with this ID
|
|
||||||
if requestID != math.MaxUint32 { // request ID of math.MaxUint32 means the put was a gossip message. In that case, just return.
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("Unexpected Put. There is no outstanding request to %s with request ID %d", vdr, requestID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !expectedVtxID.Equals(parsedVtxID) {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("Put(%s, %d) contains vertex %s but should contain vertex %s.", vdr, requestID, parsedVtxID, expectedVtxID)
|
|
||||||
b.outstandingRequests.Add(vdr, requestID, expectedVtxID) // Just going to be removed by GetFailed
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.process(vtx) // Process this vtx
|
|
||||||
}
|
|
||||||
|
|
||||||
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
|
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
|
||||||
// with request ID [requestID]
|
// with request ID [requestID]
|
||||||
func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte) error {
|
func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte) error {
|
||||||
b.BootstrapConfig.Context.Log.Verbo("in MultiPut(%s, %d). len(vtxs): %d", vdr, requestID, len(vtxs)) // TODO remove
|
if lenVtxs := len(vtxs); lenVtxs > common.MaxContainersPerMultiPut {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains more than maximum number of vertices", vdr, requestID)
|
||||||
|
return b.GetAncestorsFailed(vdr, requestID)
|
||||||
|
} else if lenVtxs == 0 {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains no vertices", vdr, requestID)
|
||||||
|
return b.GetAncestorsFailed(vdr, requestID)
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure this is in response to a request we made
|
// Make sure this is in response to a request we made
|
||||||
neededVtxID, needed := b.outstandingRequests.Remove(vdr, requestID)
|
neededVtxID, needed := b.outstandingRequests.Remove(vdr, requestID)
|
||||||
if !needed { // this message isn't in response to a request for a vertex we need
|
if !needed { // this message isn't in response to a request we made
|
||||||
if _, requested := b.outstandingRequests.Remove(vdr, requestID); !requested { // this message isn't in response to a request for a vertex we greedily requested
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("received unexpected MultiPut from %s with ID %d", vdr, requestID)
|
b.BootstrapConfig.Context.Log.Debug("received unexpected MultiPut from %s with ID %d", vdr, requestID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
neededVtx, err := b.State.ParseVertex(vtxs[0]) // the vertex we requested
|
||||||
|
if err != nil {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("Failed to parse requested vertex %s: %w", neededVtxID, err)
|
||||||
|
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxs[0]})
|
||||||
|
return b.fetch(neededVtxID)
|
||||||
|
} else if actualID := neededVtx.ID(); !actualID.Equals(neededVtxID) {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("expected the first block to be the requested block, %s, but is %s", neededVtxID, actualID)
|
||||||
|
return b.fetch(neededVtxID)
|
||||||
}
|
}
|
||||||
|
|
||||||
var neededVtx avalanche.Vertex = nil // the vertex that this MultiPut is in response to
|
for _, vtxBytes := range vtxs {
|
||||||
for i, vtxBytes := range vtxs {
|
if _, err := b.State.ParseVertex(vtxBytes); err != nil { // Persists the vtx
|
||||||
if i > common.MaxContainersPerMultiPut {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("MultiPut from %s contains more than maximum number of vertices. Request ID: %d", vdr, requestID)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
vtx, err := b.State.ParseVertex(vtxBytes) // Persists the vtx
|
|
||||||
if err != nil {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("Failed to parse vertex: %w", err)
|
b.BootstrapConfig.Context.Log.Debug("Failed to parse vertex: %w", err)
|
||||||
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
|
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
|
||||||
}
|
}
|
||||||
if vtx.ID().Equals(neededVtxID) {
|
|
||||||
neededVtx = vtx // found the vtx we wanted
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if !needed {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// This MultiPut was supposed to include [neededVtxID] but it didn't
|
|
||||||
if neededVtx == nil {
|
|
||||||
b.outstandingRequests.Add(vdr, requestID, neededVtxID) // immediately removed by getFailed
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.process(neededVtx)
|
return b.process(neededVtx)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFailed is called when a Get message we sent fails
|
// GetAncestorsFailed is called when a GetAncestors message we sent fails
|
||||||
func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
func (b *bootstrapper) GetAncestorsFailed(vdr ids.ShortID, requestID uint32) error {
|
||||||
vtxID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
vtxID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
||||||
if !ok {
|
if !ok {
|
||||||
b.BootstrapConfig.Context.Log.Debug("GetFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
|
b.BootstrapConfig.Context.Log.Debug("GetAncestorsFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Send another request for this
|
// Send another request for the vertex
|
||||||
return b.fetch(vtxID)
|
return b.fetch(vtxID)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -269,7 +235,9 @@ func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
||||||
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
|
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
|
||||||
for _, vtxID := range acceptedContainerIDs.List() {
|
for _, vtxID := range acceptedContainerIDs.List() {
|
||||||
if vtx, err := b.State.GetVertex(vtxID); err == nil {
|
if vtx, err := b.State.GetVertex(vtxID); err == nil {
|
||||||
b.process(vtx)
|
if err := b.process(vtx); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
} else if err := b.fetch(vtxID); err != nil {
|
} else if err := b.fetch(vtxID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
|
@ -22,7 +22,7 @@ import (
|
||||||
const (
|
const (
|
||||||
// TODO define this constant in one place rather than here and in snowman
|
// TODO define this constant in one place rather than here and in snowman
|
||||||
// Max containers size in a MultiPut message
|
// Max containers size in a MultiPut message
|
||||||
maxContainersLen = int(4 / 5 * network.DefaultMaxMessageSize)
|
maxContainersLen = int(4 * network.DefaultMaxMessageSize / 5)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Transitive implements the Engine interface by attempting to fetch all
|
// Transitive implements the Engine interface by attempting to fetch all
|
||||||
|
@ -122,49 +122,43 @@ func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error
|
||||||
|
|
||||||
// GetAncestors implements the Engine interface
|
// GetAncestors implements the Engine interface
|
||||||
func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error {
|
func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, vtxID ids.ID) error {
|
||||||
startTime := time.Now() // TODO remove
|
startTime := time.Now()
|
||||||
t.Config.Context.Log.Verbo("In GetAncestors. Validator: %s, request ID: %d, vtxID: %s", vdr, requestID, vtxID) // TODO remove
|
t.Config.Context.Log.Verbo("In GetAncestors. Validator: %s, request ID: %d, vtxID: %s", vdr, requestID, vtxID)
|
||||||
vertex, err := t.Config.State.GetVertex(vtxID)
|
vertex, err := t.Config.State.GetVertex(vtxID)
|
||||||
if err != nil || vertex.Status() == choices.Unknown {
|
if err != nil || vertex.Status() == choices.Unknown {
|
||||||
t.Config.Context.Log.Info("dropping getAncestors")
|
t.Config.Context.Log.Verbo("dropping getAncestors")
|
||||||
return nil // Don't have the requested vertex. Drop message.
|
return nil // Don't have the requested vertex. Drop message.
|
||||||
}
|
}
|
||||||
|
|
||||||
// vertex and its ancestors. First element is vertex.
|
queue := make([]avalanche.Vertex, 1, common.MaxContainersPerMultiPut) // for BFS
|
||||||
// Further back elements are further back ancestors
|
queue[0] = vertex
|
||||||
ancestors := []avalanche.Vertex{}
|
ancestorsBytesLen := len(vertex.Bytes()) // length, in bytes, of vertex and its ancestors
|
||||||
queue := []avalanche.Vertex{vertex} // for BFS
|
ancestorsBytes := make([][]byte, 0, common.MaxContainersPerMultiPut) // vertex and its ancestors in BFS order
|
||||||
beenInQueue := ids.Set{} // IDs of vertices that have been in queue before
|
visited := ids.Set{} // IDs of vertices that have been in queue before
|
||||||
beenInQueue.Add(vertex.ID())
|
visited.Add(vertex.ID())
|
||||||
for len(ancestors) < common.MaxContainersPerMultiPut && len(queue) > 0 && time.Since(startTime) < common.MaxTimeFetchingAncestors {
|
|
||||||
|
for len(ancestorsBytes) < common.MaxContainersPerMultiPut && len(queue) > 0 && time.Since(startTime) < common.MaxTimeFetchingAncestors {
|
||||||
var vtx avalanche.Vertex
|
var vtx avalanche.Vertex
|
||||||
vtx, queue = queue[0], queue[1:] // pop
|
vtx, queue = queue[0], queue[1:] // pop
|
||||||
ancestors = append(ancestors, vtx)
|
vtxBytes := vtx.Bytes()
|
||||||
|
if newLen := ancestorsBytesLen + len(vtxBytes); newLen < maxContainersLen {
|
||||||
|
ancestorsBytes = append(ancestorsBytes, vtxBytes)
|
||||||
|
ancestorsBytesLen = newLen
|
||||||
|
} else { // reached maximum response size
|
||||||
|
break
|
||||||
|
}
|
||||||
for _, parent := range vtx.Parents() {
|
for _, parent := range vtx.Parents() {
|
||||||
if parent.Status() == choices.Unknown { // Don't have this vertex...ignore
|
if parent.Status() == choices.Unknown { // Don't have this vertex;ignore
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
parentID := parent.ID()
|
if parentID := parent.ID(); !visited.Contains(parentID) { // Already visited; ignore
|
||||||
if !beenInQueue.Contains(parentID) { // Don't add same vertex twice
|
|
||||||
queue = append(queue, parent)
|
queue = append(queue, parent)
|
||||||
beenInQueue.Add(parentID)
|
visited.Add(parentID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
containersBytesLen := 0
|
t.Config.Sender.MultiPut(vdr, requestID, ancestorsBytes)
|
||||||
containersBytes := [][]byte{}
|
|
||||||
for i := len(ancestors) - 1; i >= 0; i-- {
|
|
||||||
bytes := ancestors[i].Bytes()
|
|
||||||
if newLen := containersBytesLen + len(bytes); newLen > maxContainersLen {
|
|
||||||
containersBytes = append(containersBytes, bytes)
|
|
||||||
containersBytesLen = newLen
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Config.Context.Log.Info("GetAncestors call took %v", time.Since(startTime)) // TODO remove
|
|
||||||
t.Config.Sender.MultiPut(vdr, requestID, containersBytes)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -172,8 +166,9 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, vtxID ids.I
|
||||||
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
|
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
|
||||||
t.Config.Context.Log.Verbo("Put called for vertexID %s", vtxID)
|
t.Config.Context.Log.Verbo("Put called for vertexID %s", vtxID)
|
||||||
|
|
||||||
if !t.bootstrapped {
|
if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid
|
||||||
return t.bootstrapper.Put(vdr, requestID, vtxID, vtxBytes)
|
t.Config.Context.Log.Debug("Dropping Put for %s due to bootstrapping", vtxID)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
vtx, err := t.Config.State.ParseVertex(vtxBytes)
|
vtx, err := t.Config.State.ParseVertex(vtxBytes)
|
||||||
|
@ -189,8 +184,9 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxByt
|
||||||
|
|
||||||
// GetFailed implements the Engine interface
|
// GetFailed implements the Engine interface
|
||||||
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
||||||
if !t.bootstrapped {
|
if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid
|
||||||
return t.bootstrapper.GetFailed(vdr, requestID)
|
t.Config.Context.Log.Debug("Dropping GetFailed($s, %d) due to bootstrapping", vdr, requestID)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
vtxID, ok := t.vtxReqs.Remove(vdr, requestID)
|
vtxID, ok := t.vtxReqs.Remove(vdr, requestID)
|
||||||
|
|
|
@ -136,7 +136,6 @@ type FetchHandler interface {
|
||||||
Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
|
Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) error
|
||||||
|
|
||||||
// Notify this engine of a request for a container and its ancestors.
|
// Notify this engine of a request for a container and its ancestors.
|
||||||
//
|
|
||||||
// The request is from validator [validatorID]. The requested container is [containerID].
|
// The request is from validator [validatorID]. The requested container is [containerID].
|
||||||
//
|
//
|
||||||
// This function can be called by any validator. It is not safe to assume
|
// This function can be called by any validator. It is not safe to assume
|
||||||
|
@ -144,8 +143,8 @@ type FetchHandler interface {
|
||||||
// assume the requested containerID exists. However, the validatorID is
|
// assume the requested containerID exists. However, the validatorID is
|
||||||
// assumed to be authenticated.
|
// assumed to be authenticated.
|
||||||
//
|
//
|
||||||
// This engine should respond with a MultiPut message with the same requestID, which contains [containerID]
|
// This engine should respond with a MultiPut message with the same requestID,
|
||||||
// as well as its ancestors.
|
// which contains [containerID] as well as its ancestors. See MultiPut's documentation.
|
||||||
//
|
//
|
||||||
// If this engine doesn't have some ancestors, it should reply with its best effort attempt at getting them.
|
// If this engine doesn't have some ancestors, it should reply with its best effort attempt at getting them.
|
||||||
// If this engine doesn't have [containerID] it can ignore this message.
|
// If this engine doesn't have [containerID] it can ignore this message.
|
||||||
|
@ -171,9 +170,11 @@ type FetchHandler interface {
|
||||||
// Notify this engine of multiple containers.
|
// Notify this engine of multiple containers.
|
||||||
// Each element of [containers] is the byte representation of a container.
|
// Each element of [containers] is the byte representation of a container.
|
||||||
//
|
//
|
||||||
// This should only be called during bootstrapping, and only in response to a GetAncestors
|
// This should only be called during bootstrapping, and in response to a GetAncestors message to
|
||||||
// call to validator [validatorID] with request ID [requestID]. This call should contain
|
// [validatorID] with request ID [requestID]. This call should contain the container requested in
|
||||||
// the container requested in that message, along with ancestors.
|
// that message, along with ancestors.
|
||||||
|
// The containers should be in BFS order (ie the first container must be the container
|
||||||
|
// requested in the GetAncestors message and further back ancestors are later in [containers]
|
||||||
//
|
//
|
||||||
// It is not safe to assume this message is in response to a GetAncestor message, that this
|
// It is not safe to assume this message is in response to a GetAncestor message, that this
|
||||||
// message has a unique requestID or that any of the containers in [containers] are valid.
|
// message has a unique requestID or that any of the containers in [containers] are valid.
|
||||||
|
|
|
@ -5,7 +5,6 @@ package snowman
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
"math"
|
|
||||||
|
|
||||||
"github.com/ava-labs/gecko/ids"
|
"github.com/ava-labs/gecko/ids"
|
||||||
"github.com/ava-labs/gecko/snow/choices"
|
"github.com/ava-labs/gecko/snow/choices"
|
||||||
|
@ -16,8 +15,6 @@ import (
|
||||||
"github.com/prometheus/client_golang/prometheus"
|
"github.com/prometheus/client_golang/prometheus"
|
||||||
)
|
)
|
||||||
|
|
||||||
const ()
|
|
||||||
|
|
||||||
// BootstrapConfig ...
|
// BootstrapConfig ...
|
||||||
type BootstrapConfig struct {
|
type BootstrapConfig struct {
|
||||||
common.Config
|
common.Config
|
||||||
|
@ -60,14 +57,14 @@ func (b *bootstrapper) Initialize(config BootstrapConfig) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CurrentAcceptedFrontier ...
|
// CurrentAcceptedFrontier returns the last accepted block
|
||||||
func (b *bootstrapper) CurrentAcceptedFrontier() ids.Set {
|
func (b *bootstrapper) CurrentAcceptedFrontier() ids.Set {
|
||||||
acceptedFrontier := ids.Set{}
|
acceptedFrontier := ids.Set{}
|
||||||
acceptedFrontier.Add(b.VM.LastAccepted())
|
acceptedFrontier.Add(b.VM.LastAccepted())
|
||||||
return acceptedFrontier
|
return acceptedFrontier
|
||||||
}
|
}
|
||||||
|
|
||||||
// FilterAccepted ...
|
// FilterAccepted returns the blocks in [containerIDs] that we have accepted
|
||||||
func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
|
func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
|
||||||
acceptedIDs := ids.Set{}
|
acceptedIDs := ids.Set{}
|
||||||
for _, blkID := range containerIDs.List() {
|
for _, blkID := range containerIDs.List() {
|
||||||
|
@ -82,7 +79,9 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
|
||||||
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
|
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
|
||||||
for _, blkID := range acceptedContainerIDs.List() {
|
for _, blkID := range acceptedContainerIDs.List() {
|
||||||
if blk, err := b.VM.GetBlock(blkID); err == nil {
|
if blk, err := b.VM.GetBlock(blkID); err == nil {
|
||||||
b.process(blk)
|
if err := b.process(blk); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
} else if err := b.fetch(blkID); err != nil {
|
} else if err := b.fetch(blkID); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -96,8 +95,13 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get a block and its ancestors
|
// Get block [blkID] and its ancestors from a validator
|
||||||
func (b *bootstrapper) fetch(blkID ids.ID) error {
|
func (b *bootstrapper) fetch(blkID ids.ID) error {
|
||||||
|
// Make sure we haven't already requested this block
|
||||||
|
if b.outstandingRequests.Contains(blkID) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure we don't already have this block
|
// Make sure we don't already have this block
|
||||||
if _, err := b.VM.GetBlock(blkID); err == nil {
|
if _, err := b.VM.GetBlock(blkID); err == nil {
|
||||||
return nil
|
return nil
|
||||||
|
@ -115,81 +119,48 @@ func (b *bootstrapper) fetch(blkID ids.ID) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put ...
|
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
|
||||||
func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) error {
|
// with request ID [requestID]
|
||||||
b.BootstrapConfig.Context.Log.Verbo("Put called for blkID %s", blkID)
|
|
||||||
|
|
||||||
vtx, err := b.VM.ParseBlock(blkBytes) // Persists the vtx. vtx.Status() not Unknown.
|
|
||||||
if err != nil {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("Failed to parse block: %w", err)
|
|
||||||
b.BootstrapConfig.Context.Log.Verbo("block: %s", formatting.DumpBytes{Bytes: blkBytes})
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
|
||||||
parsedBlockID := vtx.ID() // Actual ID of the block we just got
|
|
||||||
|
|
||||||
// The validator that sent this message said the ID of the block inside was [blkID]
|
|
||||||
// but actually it's [parsedBlockID]
|
|
||||||
if !parsedBlockID.Equals(blkID) {
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
|
||||||
|
|
||||||
expectedBlkID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
|
||||||
if !ok { // there was no outstanding request from this validator for a request with this ID
|
|
||||||
if requestID != math.MaxUint32 { // request ID of math.MaxUint32 means the put was a gossip message. In that case, just return.
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("Unexpected Put. There is no outstanding request to %s with request ID %d", vdr, requestID)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if !expectedBlkID.Equals(parsedBlockID) {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("Put(%s, %d) contains block %s but should contain block %s.", vdr, requestID, parsedBlockID, expectedBlkID)
|
|
||||||
b.outstandingRequests.Add(vdr, requestID, expectedBlkID) // Just going to be removed by GetFailed
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
|
||||||
|
|
||||||
return b.process(vtx)
|
|
||||||
}
|
|
||||||
|
|
||||||
// MultiPut ...
|
|
||||||
func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, blks [][]byte) error {
|
func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, blks [][]byte) error {
|
||||||
b.BootstrapConfig.Context.Log.Verbo("in MultiPut(%s, %d). len(blks): %d", vdr, requestID, len(blks)) // TODO remove
|
if lenBlks := len(blks); lenBlks > common.MaxContainersPerMultiPut {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains more than maximum number of blocks", vdr, requestID)
|
||||||
|
return b.GetAncestorsFailed(vdr, requestID)
|
||||||
|
} else if lenBlks == 0 {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("MultiPut(%s, %d) contains no blocks", vdr, requestID)
|
||||||
|
return b.GetAncestorsFailed(vdr, requestID)
|
||||||
|
}
|
||||||
|
|
||||||
// Make sure this is in response to a request we made
|
// Make sure this is in response to a request we made
|
||||||
wantedBlkID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
wantedBlkID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
||||||
if !ok {
|
if !ok { // this message isn't in response to a request we made
|
||||||
b.BootstrapConfig.Context.Log.Debug("received unexpected MultiPut from %s with ID %d", vdr, requestID)
|
b.BootstrapConfig.Context.Log.Debug("received unexpected MultiPut from %s with ID %d", vdr, requestID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
var wantedBlk snowman.Block = nil // the block that this MultiPut is in response to
|
wantedBlk, err := b.VM.ParseBlock(blks[0]) // the block we requested
|
||||||
for i, blkBytes := range blks {
|
|
||||||
if i > common.MaxContainersPerMultiPut {
|
|
||||||
b.BootstrapConfig.Context.Log.Debug("MultiPut from %s contains more than maximum number of vertices. Request ID: %d", vdr, requestID)
|
|
||||||
break
|
|
||||||
}
|
|
||||||
blk, err := b.VM.ParseBlock(blkBytes) // Persists the blk
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("Failed to parse requested block %s: %w", wantedBlkID, err)
|
||||||
|
return b.fetch(wantedBlkID)
|
||||||
|
} else if actualID := wantedBlk.ID(); !actualID.Equals(wantedBlkID) {
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("expected the first block to be the requested block, %s, but is %s", wantedBlk, actualID)
|
||||||
|
return b.fetch(wantedBlkID)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, blkBytes := range blks {
|
||||||
|
if _, err := b.VM.ParseBlock(blkBytes); err != nil { // persists the block
|
||||||
b.BootstrapConfig.Context.Log.Debug("Failed to parse block: %w", err)
|
b.BootstrapConfig.Context.Log.Debug("Failed to parse block: %w", err)
|
||||||
b.BootstrapConfig.Context.Log.Verbo("block: %s", formatting.DumpBytes{Bytes: blkBytes})
|
b.BootstrapConfig.Context.Log.Verbo("block: %s", formatting.DumpBytes{Bytes: blkBytes})
|
||||||
}
|
}
|
||||||
if blk.ID().Equals(wantedBlkID) {
|
|
||||||
wantedBlk = blk // found the block we wanted
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// This MultiPut was supposed to include [wantedBlkID] but it didn't
|
|
||||||
if wantedBlk == nil {
|
|
||||||
b.outstandingRequests.Add(vdr, requestID, wantedBlkID) // immediately removed by getFailed
|
|
||||||
return b.GetFailed(vdr, requestID)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
return b.process(wantedBlk)
|
return b.process(wantedBlk)
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetFailed is called when a Get message we sent fails
|
// GetAncestorsFailed is called when a GetAncestors message we sent fails
|
||||||
func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
func (b *bootstrapper) GetAncestorsFailed(vdr ids.ShortID, requestID uint32) error {
|
||||||
blkID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
blkID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
||||||
if !ok {
|
if !ok {
|
||||||
b.BootstrapConfig.Context.Log.Debug("GetFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
|
b.BootstrapConfig.Context.Log.Debug("GetAncestorsFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
// Send another request for this
|
// Send another request for this
|
||||||
|
@ -198,13 +169,12 @@ func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
||||||
|
|
||||||
// process a block
|
// process a block
|
||||||
func (b *bootstrapper) process(blk snowman.Block) error {
|
func (b *bootstrapper) process(blk snowman.Block) error {
|
||||||
|
|
||||||
status := blk.Status()
|
status := blk.Status()
|
||||||
blkID := blk.ID()
|
blkID := blk.ID()
|
||||||
for status == choices.Processing {
|
for status == choices.Processing {
|
||||||
b.numProcessed++ // Progress tracker
|
b.numProcessed++ // Progress tracker
|
||||||
if b.numProcessed%common.StatusUpdateFrequency == 0 { // Periodically print progress
|
if b.numProcessed%common.StatusUpdateFrequency == 0 { // Periodically print progress
|
||||||
b.BootstrapConfig.Context.Log.Debug("processed %d blocks", b.numProcessed)
|
b.BootstrapConfig.Context.Log.Info("processed %d blocks", b.numProcessed)
|
||||||
}
|
}
|
||||||
if err := b.Blocked.Push(&blockJob{
|
if err := b.Blocked.Push(&blockJob{
|
||||||
numAccepted: b.numBootstrapped,
|
numAccepted: b.numBootstrapped,
|
||||||
|
|
|
@ -20,7 +20,7 @@ import (
|
||||||
const (
|
const (
|
||||||
// TODO define this constant in one place rather than here and in snowman
|
// TODO define this constant in one place rather than here and in snowman
|
||||||
// Max containers size in a MultiPut message
|
// Max containers size in a MultiPut message
|
||||||
maxContainersLen = int(4 / 5 * network.DefaultMaxMessageSize)
|
maxContainersLen = int(4 * network.DefaultMaxMessageSize / 5)
|
||||||
)
|
)
|
||||||
|
|
||||||
// Transitive implements the Engine interface by attempting to fetch all
|
// Transitive implements the Engine interface by attempting to fetch all
|
||||||
|
@ -159,45 +159,35 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, blkID ids.I
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// ancestors[0] is [blk]. ancestors[1] is its parent, ancestors[2] is its grandparent, etc.
|
ancestorsBytes := make([][]byte, 1, common.MaxContainersPerMultiPut) // First elt is byte repr. of blk, then its parents, then grandparent, etc.
|
||||||
ancestors := []snowman.Block{blk}
|
ancestorsBytes[0] = blk.Bytes()
|
||||||
for i := 1; i <= int(common.MaxContainersPerMultiPut); i++ {
|
ancestorsBytesLen := len(blk.Bytes()) // length, in bytes, of all elements of ancestors
|
||||||
if time.Since(startTime) > common.MaxTimeFetchingAncestors {
|
|
||||||
|
for i := 1; i < common.MaxContainersPerMultiPut && time.Since(startTime) > common.MaxTimeFetchingAncestors; i++ {
|
||||||
|
blk = blk.Parent()
|
||||||
|
if blk.Status() == choices.Unknown {
|
||||||
|
t.Config.Context.Log.Debug("couldn't get block %s. dropping GetAncestors from %s. Request ID: %s", blk, vdr, requestID)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
ancestor := ancestors[i-1].Parent()
|
blkBytes := blk.Bytes()
|
||||||
if ancestor.Status() == choices.Unknown {
|
if newLen := ancestorsBytesLen + len(blkBytes); newLen < maxContainersLen {
|
||||||
// Probably failed to fetch because the block we tried to fetch is the genesis block's parent (non-existent)
|
ancestorsBytes = append(ancestorsBytes, blkBytes)
|
||||||
t.Config.Context.Log.Verbo("couldn't get block %s. dropping GetAncestors from %s. Request ID: %s", ancestor, vdr, requestID)
|
ancestorsBytesLen = newLen
|
||||||
|
} else { // reached maximum response size
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
ancestors = append(ancestors, ancestor)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
containersBytesLen := 0
|
t.Config.Sender.MultiPut(vdr, requestID, ancestorsBytes)
|
||||||
containersBytes := [][]byte{}
|
|
||||||
for i := 0; i < len(ancestors); i++ {
|
|
||||||
bytes := ancestors[i].Bytes()
|
|
||||||
if newLen := containersBytesLen + len(bytes); newLen > maxContainersLen {
|
|
||||||
containersBytes = append(containersBytes, bytes)
|
|
||||||
containersBytesLen = newLen
|
|
||||||
} else {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
t.Config.Context.Log.Info("GetAncestors call took %v", time.Since(startTime)) // TODO remove
|
|
||||||
t.Config.Sender.MultiPut(vdr, requestID, containersBytes)
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put implements the Engine interface
|
// Put implements the Engine interface
|
||||||
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) error {
|
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) error {
|
||||||
t.Config.Context.Log.Verbo("Put called for blockID %s", blkID)
|
// bootstrapping isn't done --> we didn't send any gets --> this put is invalid
|
||||||
|
|
||||||
// if the engine hasn't been bootstrapped, forward the request to the
|
|
||||||
// bootstrapper
|
|
||||||
if !t.bootstrapped {
|
if !t.bootstrapped {
|
||||||
return t.bootstrapper.Put(vdr, requestID, blkID, blkBytes)
|
t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID)
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
blk, err := t.Config.VM.ParseBlock(blkBytes)
|
blk, err := t.Config.VM.ParseBlock(blkBytes)
|
||||||
|
@ -223,10 +213,10 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkByt
|
||||||
|
|
||||||
// GetFailed implements the Engine interface
|
// GetFailed implements the Engine interface
|
||||||
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
||||||
// if the engine hasn't been bootstrapped, forward the request to the
|
// not done bootstrapping --> didn't send a get --> this message is invalid
|
||||||
// bootstrapper
|
|
||||||
if !t.bootstrapped {
|
if !t.bootstrapped {
|
||||||
return t.bootstrapper.GetFailed(vdr, requestID)
|
t.Config.Context.Log.Debug("dropping GetFailed(%s, %d) due to bootstrapping")
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// we don't use the assumption that this function is called after a failed
|
// we don't use the assumption that this function is called after a failed
|
||||||
|
|
Loading…
Reference in New Issue