diff --git a/snow/engine/avalanche/bootstrapper.go b/snow/engine/avalanche/bootstrapper.go index 352f40b..d8f1d6b 100644 --- a/snow/engine/avalanche/bootstrapper.go +++ b/snow/engine/avalanche/bootstrapper.go @@ -42,18 +42,16 @@ type bootstrapper struct { metrics common.Bootstrapper - // true if all of the vertices in the original accepted frontier have been processed - processedStartingAcceptedFrontier bool - // number of vertices fetched so far numFetched uint32 // tracks which validators were asked for which containers in which requests outstandingRequests common.Requests - // IDs of vertices that we will send a GetAncestors request for once we are not at the - // max number of outstanding requests - // Invariant: The intersection of needToFetch and outstandingRequests is empty + // IDs of vertices that we will send a GetAncestors request for once we are + // not at the max number of outstanding requests + // Invariant: The intersection of needToFetch and outstandingRequests is + // empty needToFetch ids.Set // Contains IDs of vertices that have recently been processed @@ -108,49 +106,36 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set { return acceptedVtxIDs } -// Calls fetch for a pending vertex if there are any -func (b *bootstrapper) fetchANeededVtx() error { - if b.needToFetch.Len() > 0 { - return b.fetch(b.needToFetch.CappedList(1)[0]) - } - return nil -} +// Fetch vertices and their ancestors from the set of vertices that are needed +// to be fetched. +func (b *bootstrapper) fetch(vtxIDs ...ids.ID) error { + b.needToFetch.Add(vtxIDs...) + for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < common.MaxOutstandingRequests { + vtxID := b.needToFetch.CappedList(1)[0] + b.needToFetch.Remove(vtxID) -// Get vertex [vtxID] and its ancestors. -// If [vtxID] has already been requested or is already fetched, and there are -// unrequested vertices, requests one such vertex instead of [vtxID] -func (b *bootstrapper) fetch(vtxID ids.ID) error { - // Make sure we haven't already requested this block - if b.outstandingRequests.Contains(vtxID) { - return b.fetchANeededVtx() - } - - // Make sure we don't already have this vertex - if _, err := b.State.GetVertex(vtxID); err == nil { - if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier { - return b.finish() + // Make sure we haven't already requested this vertex + if b.outstandingRequests.Contains(vtxID) { + continue } - b.needToFetch.Remove(vtxID) // we have this vertex. no need to request it. - return b.fetchANeededVtx() - } - // If we're already at maximum number of outstanding requests, queue for later - if b.outstandingRequests.Len() >= common.MaxOutstandingRequests { - b.needToFetch.Add(vtxID) - return nil - } + // Make sure we don't already have this vertex + if _, err := b.State.GetVertex(vtxID); err == nil { + continue + } - validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to - if len(validators) == 0 { - return fmt.Errorf("Dropping request for %s as there are no validators", vtxID) - } - validatorID := validators[0].ID() - b.RequestID++ + validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to + if len(validators) == 0 { + return fmt.Errorf("Dropping request for %s as there are no validators", vtxID) + } + validatorID := validators[0].ID() + b.RequestID++ - b.outstandingRequests.Add(validatorID, b.RequestID, vtxID) - b.needToFetch.Remove(vtxID) // maintains invariant that intersection with outstandingRequests is empty - b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors - return nil + b.outstandingRequests.Add(validatorID, b.RequestID, vtxID) + b.needToFetch.Remove(vtxID) // maintains invariant that intersection with outstandingRequests is empty + b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors + } + return b.finish() } // Process vertices @@ -164,14 +149,17 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error { for toProcess.Len() > 0 { vtx := toProcess.Pop() + vtxID := vtx.ID() + switch vtx.Status() { case choices.Unknown: - if err := b.fetch(vtx.ID()); err != nil { - return err - } + b.fetch(vtxID) case choices.Rejected: + b.needToFetch.Remove(vtxID) return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID()) case choices.Processing: + b.needToFetch.Remove(vtxID) + if err := b.VtxBlocked.Push(&vertexJob{ log: b.BootstrapConfig.Context.Log, numAccepted: b.numBSVtx, @@ -216,10 +204,7 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error { return err } - if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier { - return b.finish() - } - return nil + return b.fetch() } // MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr] @@ -263,11 +248,6 @@ func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte } } - // Now there is one less outstanding request; send another if needed - if err := b.fetchANeededVtx(); err != nil { - return err - } - return b.process(processVertices...) } @@ -293,24 +273,19 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error { for _, vtxID := range acceptedContainerIDs.List() { if vtx, err := b.State.GetVertex(vtxID); err == nil { storedVtxs = append(storedVtxs, vtx) - } else if err := b.fetch(vtxID); err != nil { - return err + } else { + b.needToFetch.Add(vtxID) } } if err := b.process(storedVtxs...); err != nil { return err } - b.processedStartingAcceptedFrontier = true - - if numPending := b.outstandingRequests.Len(); numPending == 0 { - return b.finish() - } - return nil + return b.fetch() } // Finish bootstrapping func (b *bootstrapper) finish() error { - if b.finished { + if b.finished || b.outstandingRequests.Len() > 0 || b.needToFetch.Len() > 0 { return nil } b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing transaction state transitions...")