diff --git a/ids/set.go b/ids/set.go index c3aa024..d632949 100644 --- a/ids/set.go +++ b/ids/set.go @@ -78,7 +78,7 @@ func (ids *Set) Clear() { *ids = nil } // List converts this set into a list func (ids Set) List() []ID { - idList := make([]ID, ids.Len(), ids.Len()) + idList := make([]ID, ids.Len()) i := 0 for id := range ids { idList[i] = NewID(id) @@ -87,6 +87,27 @@ func (ids Set) List() []ID { return idList } +// CappedList returns a list of length at most [size]. +// Size should be >= 0. If size < 0, returns nil. +func (ids Set) CappedList(size int) []ID { + if size < 0 { + return nil + } + if l := ids.Len(); l < size { + size = l + } + i := 0 + idList := make([]ID, size) + for id := range ids { + if i >= size { + break + } + idList[i] = NewID(id) + i++ + } + return idList +} + // Equals returns true if the sets contain the same elements func (ids Set) Equals(oIDs Set) bool { if ids.Len() != oIDs.Len() { diff --git a/ids/set_test.go b/ids/set_test.go index 3c7ab15..b4e05db 100644 --- a/ids/set_test.go +++ b/ids/set_test.go @@ -55,3 +55,46 @@ func TestSet(t *testing.T) { t.Fatalf("Sets overlap") } } + +func TestSetCappedList(t *testing.T) { + set := Set{} + + id := Empty + + if list := set.CappedList(0); len(list) != 0 { + t.Fatalf("List should have been empty but was %v", list) + } + + set.Add(id) + + if list := set.CappedList(0); len(list) != 0 { + t.Fatalf("List should have been empty but was %v", list) + } else if list := set.CappedList(1); len(list) != 1 { + t.Fatalf("List should have had length %d but had %d", 1, len(list)) + } else if returnedID := list[0]; !id.Equals(returnedID) { + t.Fatalf("List should have been %s but was %s", id, returnedID) + } else if list := set.CappedList(2); len(list) != 1 { + t.Fatalf("List should have had length %d but had %d", 1, len(list)) + } else if returnedID := list[0]; !id.Equals(returnedID) { + t.Fatalf("List should have been %s but was %s", id, returnedID) + } + + id2 := NewID([32]byte{1}) + set.Add(id2) + + if list := set.CappedList(0); len(list) != 0 { + t.Fatalf("List should have been empty but was %v", list) + } else if list := set.CappedList(1); len(list) != 1 { + t.Fatalf("List should have had length %d but had %d", 1, len(list)) + } else if returnedID := list[0]; !id.Equals(returnedID) && !id2.Equals(returnedID) { + t.Fatalf("List should have been %s but was %s", id, returnedID) + } else if list := set.CappedList(2); len(list) != 2 { + t.Fatalf("List should have had length %d but had %d", 2, len(list)) + } else if list := set.CappedList(3); len(list) != 2 { + t.Fatalf("List should have had length %d but had %d", 2, len(list)) + } else if returnedID := list[0]; !id.Equals(returnedID) && !id2.Equals(returnedID) { + t.Fatalf("list contains unexpected element %s", returnedID) + } else if returnedID := list[1]; !id.Equals(returnedID) && !id2.Equals(returnedID) { + t.Fatalf("list contains unexpected element %s", returnedID) + } +} diff --git a/snow/engine/avalanche/bootstrapper.go b/snow/engine/avalanche/bootstrapper.go index 3ed58c7..1af48bf 100644 --- a/snow/engine/avalanche/bootstrapper.go +++ b/snow/engine/avalanche/bootstrapper.go @@ -42,15 +42,16 @@ type bootstrapper struct { metrics common.Bootstrapper - // true if all of the vertices in the original accepted frontier have been processed - processedStartingAcceptedFrontier bool - // number of vertices fetched so far numFetched uint32 // tracks which validators were asked for which containers in which requests outstandingRequests common.Requests + // IDs of vertices that we will send a GetAncestors request for once we are + // not at the max number of outstanding requests + needToFetch ids.Set + // Contains IDs of vertices that have recently been processed processedCache *cache.LRU @@ -103,31 +104,36 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set { return acceptedVtxIDs } -// Get vertex [vtxID] and its ancestors -func (b *bootstrapper) fetch(vtxID ids.ID) error { - // Make sure we haven't already requested this block - if b.outstandingRequests.Contains(vtxID) { - return nil - } +// Fetch vertices and their ancestors from the set of vertices that are needed +// to be fetched. +func (b *bootstrapper) fetch(vtxIDs ...ids.ID) error { + b.needToFetch.Add(vtxIDs...) + for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < common.MaxOutstandingRequests { + vtxID := b.needToFetch.CappedList(1)[0] + b.needToFetch.Remove(vtxID) - // Make sure we don't already have this vertex - if _, err := b.State.GetVertex(vtxID); err == nil { - if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier { - return b.finish() + // Make sure we haven't already requested this vertex + if b.outstandingRequests.Contains(vtxID) { + continue } - return nil - } - validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to - if len(validators) == 0 { - return fmt.Errorf("Dropping request for %s as there are no validators", vtxID) - } - validatorID := validators[0].ID() - b.RequestID++ + // Make sure we don't already have this vertex + if _, err := b.State.GetVertex(vtxID); err == nil { + continue + } - b.outstandingRequests.Add(validatorID, b.RequestID, vtxID) - b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors - return nil + validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to + if len(validators) == 0 { + return fmt.Errorf("Dropping request for %s as there are no validators", vtxID) + } + validatorID := validators[0].ID() + b.RequestID++ + + b.outstandingRequests.Add(validatorID, b.RequestID, vtxID) + b.needToFetch.Remove(vtxID) // maintains invariant that intersection with outstandingRequests is empty + b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors + } + return b.finish() } // Process vertices @@ -141,14 +147,17 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error { for toProcess.Len() > 0 { vtx := toProcess.Pop() + vtxID := vtx.ID() + switch vtx.Status() { case choices.Unknown: - if err := b.fetch(vtx.ID()); err != nil { - return err - } + b.needToFetch.Add(vtxID) case choices.Rejected: + b.needToFetch.Remove(vtxID) return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID()) case choices.Processing: + b.needToFetch.Remove(vtxID) + if err := b.VtxBlocked.Push(&vertexJob{ log: b.BootstrapConfig.Context.Log, numAccepted: b.numBSVtx, @@ -193,10 +202,7 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error { return err } - if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier { - return b.finish() - } - return nil + return b.fetch() } // MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr] @@ -236,6 +242,7 @@ func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes}) } else { processVertices = append(processVertices, vtx) + b.needToFetch.Remove(vtx.ID()) // No need to fetch this vertex since we have it now } } @@ -264,24 +271,16 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error { for _, vtxID := range acceptedContainerIDs.List() { if vtx, err := b.State.GetVertex(vtxID); err == nil { storedVtxs = append(storedVtxs, vtx) - } else if err := b.fetch(vtxID); err != nil { - return err + } else { + b.needToFetch.Add(vtxID) } } - if err := b.process(storedVtxs...); err != nil { - return err - } - b.processedStartingAcceptedFrontier = true - - if numPending := b.outstandingRequests.Len(); numPending == 0 { - return b.finish() - } - return nil + return b.process(storedVtxs...) } // Finish bootstrapping func (b *bootstrapper) finish() error { - if b.finished { + if b.finished || b.outstandingRequests.Len() > 0 || b.needToFetch.Len() > 0 { return nil } b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing transaction state transitions...") diff --git a/snow/engine/common/bootstrapper.go b/snow/engine/common/bootstrapper.go index 49f4051..f1f58db 100644 --- a/snow/engine/common/bootstrapper.go +++ b/snow/engine/common/bootstrapper.go @@ -17,11 +17,14 @@ const ( // StatusUpdateFrequency ... bootstrapper logs "processed X blocks/vertices" every [statusUpdateFrequency] blocks/vertices StatusUpdateFrequency = 2500 + + // MaxOutstandingRequests is the maximum number of GetAncestors sent but not responsded to/failed + MaxOutstandingRequests = 8 ) var ( // MaxTimeFetchingAncestors is the maximum amount of time to spend fetching vertices during a call to GetAncestors - MaxTimeFetchingAncestors = 100 * time.Millisecond + MaxTimeFetchingAncestors = 50 * time.Millisecond ) // Bootstrapper implements the Engine interface.