removed mutually recursive functions for fetching

This commit is contained in:
StephenButtolph 2020-06-23 19:41:22 -04:00
parent 561e021e67
commit 8c7934515c
1 changed files with 40 additions and 65 deletions

View File

@ -42,18 +42,16 @@ type bootstrapper struct {
metrics
common.Bootstrapper
// true if all of the vertices in the original accepted frontier have been processed
processedStartingAcceptedFrontier bool
// number of vertices fetched so far
numFetched uint32
// tracks which validators were asked for which containers in which requests
outstandingRequests common.Requests
// IDs of vertices that we will send a GetAncestors request for once we are not at the
// max number of outstanding requests
// Invariant: The intersection of needToFetch and outstandingRequests is empty
// IDs of vertices that we will send a GetAncestors request for once we are
// not at the max number of outstanding requests
// Invariant: The intersection of needToFetch and outstandingRequests is
// empty
needToFetch ids.Set
// Contains IDs of vertices that have recently been processed
@ -108,49 +106,36 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
return acceptedVtxIDs
}
// Calls fetch for a pending vertex if there are any
func (b *bootstrapper) fetchANeededVtx() error {
if b.needToFetch.Len() > 0 {
return b.fetch(b.needToFetch.CappedList(1)[0])
}
return nil
}
// Fetch vertices and their ancestors from the set of vertices that are needed
// to be fetched.
func (b *bootstrapper) fetch(vtxIDs ...ids.ID) error {
b.needToFetch.Add(vtxIDs...)
for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < common.MaxOutstandingRequests {
vtxID := b.needToFetch.CappedList(1)[0]
b.needToFetch.Remove(vtxID)
// Get vertex [vtxID] and its ancestors.
// If [vtxID] has already been requested or is already fetched, and there are
// unrequested vertices, requests one such vertex instead of [vtxID]
func (b *bootstrapper) fetch(vtxID ids.ID) error {
// Make sure we haven't already requested this block
if b.outstandingRequests.Contains(vtxID) {
return b.fetchANeededVtx()
}
// Make sure we don't already have this vertex
if _, err := b.State.GetVertex(vtxID); err == nil {
if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier {
return b.finish()
// Make sure we haven't already requested this vertex
if b.outstandingRequests.Contains(vtxID) {
continue
}
b.needToFetch.Remove(vtxID) // we have this vertex. no need to request it.
return b.fetchANeededVtx()
}
// If we're already at maximum number of outstanding requests, queue for later
if b.outstandingRequests.Len() >= common.MaxOutstandingRequests {
b.needToFetch.Add(vtxID)
return nil
}
// Make sure we don't already have this vertex
if _, err := b.State.GetVertex(vtxID); err == nil {
continue
}
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
if len(validators) == 0 {
return fmt.Errorf("Dropping request for %s as there are no validators", vtxID)
}
validatorID := validators[0].ID()
b.RequestID++
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
if len(validators) == 0 {
return fmt.Errorf("Dropping request for %s as there are no validators", vtxID)
}
validatorID := validators[0].ID()
b.RequestID++
b.outstandingRequests.Add(validatorID, b.RequestID, vtxID)
b.needToFetch.Remove(vtxID) // maintains invariant that intersection with outstandingRequests is empty
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors
return nil
b.outstandingRequests.Add(validatorID, b.RequestID, vtxID)
b.needToFetch.Remove(vtxID) // maintains invariant that intersection with outstandingRequests is empty
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors
}
return b.finish()
}
// Process vertices
@ -164,14 +149,17 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error {
for toProcess.Len() > 0 {
vtx := toProcess.Pop()
vtxID := vtx.ID()
switch vtx.Status() {
case choices.Unknown:
if err := b.fetch(vtx.ID()); err != nil {
return err
}
b.fetch(vtxID)
case choices.Rejected:
b.needToFetch.Remove(vtxID)
return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID())
case choices.Processing:
b.needToFetch.Remove(vtxID)
if err := b.VtxBlocked.Push(&vertexJob{
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSVtx,
@ -216,10 +204,7 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error {
return err
}
if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier {
return b.finish()
}
return nil
return b.fetch()
}
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
@ -263,11 +248,6 @@ func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte
}
}
// Now there is one less outstanding request; send another if needed
if err := b.fetchANeededVtx(); err != nil {
return err
}
return b.process(processVertices...)
}
@ -293,24 +273,19 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
for _, vtxID := range acceptedContainerIDs.List() {
if vtx, err := b.State.GetVertex(vtxID); err == nil {
storedVtxs = append(storedVtxs, vtx)
} else if err := b.fetch(vtxID); err != nil {
return err
} else {
b.needToFetch.Add(vtxID)
}
}
if err := b.process(storedVtxs...); err != nil {
return err
}
b.processedStartingAcceptedFrontier = true
if numPending := b.outstandingRequests.Len(); numPending == 0 {
return b.finish()
}
return nil
return b.fetch()
}
// Finish bootstrapping
func (b *bootstrapper) finish() error {
if b.finished {
if b.finished || b.outstandingRequests.Len() > 0 || b.needToFetch.Len() > 0 {
return nil
}
b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing transaction state transitions...")