Merge pull request #120 from ava-labs/bootstrap-comments

Add comments
This commit is contained in:
Stephen Buttolph 2020-06-24 12:05:06 -04:00 committed by GitHub
commit 6791f3489e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
1 changed files with 30 additions and 26 deletions

View File

@ -86,14 +86,15 @@ func (b *bootstrapper) Initialize(config BootstrapConfig) error {
return nil
}
// CurrentAcceptedFrontier ...
// CurrentAcceptedFrontier returns the set of vertices that this node has accepted
// that have no accepted children
func (b *bootstrapper) CurrentAcceptedFrontier() ids.Set {
acceptedFrontier := ids.Set{}
acceptedFrontier.Add(b.State.Edge()...)
return acceptedFrontier
}
// FilterAccepted ...
// FilterAccepted returns the IDs of vertices in [containerIDs] that this node has accepted
func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
acceptedVtxIDs := ids.Set{}
for _, vtxID := range containerIDs.List() {
@ -104,8 +105,9 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
return acceptedVtxIDs
}
// Fetch vertices and their ancestors from the set of vertices that are needed
// to be fetched.
// Add the vertices in [vtxIDs] to the set of vertices that we need to fetch,
// and then fetch vertices (and their ancestors) until either there are no more
// to fetch or we are at the maximum number of outstanding requests.
func (b *bootstrapper) fetch(vtxIDs ...ids.ID) error {
b.needToFetch.Add(vtxIDs...)
for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < common.MaxOutstandingRequests {
@ -130,35 +132,37 @@ func (b *bootstrapper) fetch(vtxIDs ...ids.ID) error {
b.RequestID++
b.outstandingRequests.Add(validatorID, b.RequestID, vtxID)
b.needToFetch.Remove(vtxID) // maintains invariant that intersection with outstandingRequests is empty
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors
}
return b.finish()
}
// Process vertices
// Process the vertices in [vtxs].
func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error {
// Vertices that we need to process. Store them in a heap for de-deduplication
// and so we always process vertices further down in the DAG first. This helps
// to reduce the number of repeated DAG traversals.
toProcess := newMaxVertexHeap()
for _, vtx := range vtxs {
if _, ok := b.processedCache.Get(vtx.ID()); !ok { // only process if we haven't already
if _, ok := b.processedCache.Get(vtx.ID()); !ok { // only process a vertex if we haven't already
toProcess.Push(vtx)
}
}
for toProcess.Len() > 0 {
vtx := toProcess.Pop()
for toProcess.Len() > 0 { // While there are unprocessed vertices
vtx := toProcess.Pop() // Get an unknown vertex or one furthest down the DAG
vtxID := vtx.ID()
switch vtx.Status() {
case choices.Unknown:
b.needToFetch.Add(vtxID)
b.needToFetch.Add(vtxID) // We don't have this vertex locally. Mark that we need to fetch it.
case choices.Rejected:
b.needToFetch.Remove(vtxID)
b.needToFetch.Remove(vtxID) // We have this vertex locally. Mark that we don't need to fetch it.
return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID())
case choices.Processing:
b.needToFetch.Remove(vtxID)
if err := b.VtxBlocked.Push(&vertexJob{
if err := b.VtxBlocked.Push(&vertexJob{ // Add to queue of vertices to execute when bootstrapping finishes.
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSVtx,
numDropped: b.numBSDroppedVtx,
@ -172,7 +176,7 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error {
} else {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked: %s", err)
}
for _, tx := range vtx.Txs() {
for _, tx := range vtx.Txs() { // Add transactions to queue of transactions to execute when bootstrapping finishes.
if err := b.TxBlocked.Push(&txJob{
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSTx,
@ -184,12 +188,12 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to txBlocked: %s", err)
}
}
for _, parent := range vtx.Parents() {
if _, ok := b.processedCache.Get(parent.ID()); !ok { // already processed this
for _, parent := range vtx.Parents() { // Process the parents of this vertex (traverse up the DAG)
if _, ok := b.processedCache.Get(parent.ID()); !ok { // But only if we haven't processed the parent
toProcess.Push(parent)
}
}
if vtx.Height()%stripeDistance < stripeWidth {
if vtx.Height()%stripeDistance < stripeWidth { // See comment for stripeDistance
b.processedCache.Put(vtx.ID(), nil)
}
}
@ -218,12 +222,12 @@ func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte
// Make sure this is in response to a request we made
neededVtxID, needed := b.outstandingRequests.Remove(vdr, requestID)
if !needed { // this message isn't in response to a request we made
if !needed { // this message isn't in response to a request we made, or is in response to a request that timed out
b.BootstrapConfig.Context.Log.Debug("received unexpected MultiPut from %s with ID %d", vdr, requestID)
return nil
}
neededVtx, err := b.State.ParseVertex(vtxs[0]) // the vertex we requested
neededVtx, err := b.State.ParseVertex(vtxs[0]) // first vertex should be the one we requested in GetAncestors request
if err != nil {
b.BootstrapConfig.Context.Log.Debug("Failed to parse requested vertex %s: %w", neededVtxID, err)
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxs[0]})
@ -233,7 +237,7 @@ func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte
return b.fetch(neededVtxID)
}
processVertices := make([]avalanche.Vertex, 1, len(vtxs))
processVertices := make([]avalanche.Vertex, 1, len(vtxs)) // Process all of the vertices in this message
processVertices[0] = neededVtx
for _, vtxBytes := range vtxs[1:] { // Parse/persist all the vertices
@ -260,38 +264,38 @@ func (b *bootstrapper) GetAncestorsFailed(vdr ids.ShortID, requestID uint32) err
return b.fetch(vtxID)
}
// ForceAccepted ...
// ForceAccepted starts bootstrapping. Process the vertices in [accepterContainerIDs].
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
if err := b.VM.Bootstrapping(); err != nil {
return fmt.Errorf("failed to notify VM that bootstrapping has started: %w",
err)
}
storedVtxs := make([]avalanche.Vertex, 0, acceptedContainerIDs.Len())
toProcess := make([]avalanche.Vertex, 0, acceptedContainerIDs.Len())
for _, vtxID := range acceptedContainerIDs.List() {
if vtx, err := b.State.GetVertex(vtxID); err == nil {
storedVtxs = append(storedVtxs, vtx)
toProcess = append(toProcess, vtx) // Process this vertex.
} else {
b.needToFetch.Add(vtxID)
b.needToFetch.Add(vtxID) // We don't have this vertex. Mark that we have to fetch it.
}
}
return b.process(storedVtxs...)
return b.process(toProcess...)
}
// Finish bootstrapping
func (b *bootstrapper) finish() error {
// If there are outstanding requests for vertices or we still need to fetch vertices, we can't finish
if b.finished || b.outstandingRequests.Len() > 0 || b.needToFetch.Len() > 0 {
return nil
}
b.BootstrapConfig.Context.Log.Info("finished fetching %d vertices. executing transaction state transitions...",
b.numFetched)
if err := b.executeAll(b.TxBlocked, b.numBSBlockedTx); err != nil {
return err
}
b.BootstrapConfig.Context.Log.Info("executing vertex state transitions...")
if err := b.executeAll(b.VtxBlocked, b.numBSBlockedVtx); err != nil {
return err
}