mirror of https://github.com/poanetwork/gecko.git
Merge pull request #99 from ava-labs/limit-outstanding-requests
Limit # outstanding GetAncestors to 8; reduce MaxTimeFetchingAncestors
This commit is contained in:
commit
5361fb796b
23
ids/set.go
23
ids/set.go
|
@ -78,7 +78,7 @@ func (ids *Set) Clear() { *ids = nil }
|
||||||
|
|
||||||
// List converts this set into a list
|
// List converts this set into a list
|
||||||
func (ids Set) List() []ID {
|
func (ids Set) List() []ID {
|
||||||
idList := make([]ID, ids.Len(), ids.Len())
|
idList := make([]ID, ids.Len())
|
||||||
i := 0
|
i := 0
|
||||||
for id := range ids {
|
for id := range ids {
|
||||||
idList[i] = NewID(id)
|
idList[i] = NewID(id)
|
||||||
|
@ -87,6 +87,27 @@ func (ids Set) List() []ID {
|
||||||
return idList
|
return idList
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// CappedList returns a list of length at most [size].
|
||||||
|
// Size should be >= 0. If size < 0, returns nil.
|
||||||
|
func (ids Set) CappedList(size int) []ID {
|
||||||
|
if size < 0 {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if l := ids.Len(); l < size {
|
||||||
|
size = l
|
||||||
|
}
|
||||||
|
i := 0
|
||||||
|
idList := make([]ID, size)
|
||||||
|
for id := range ids {
|
||||||
|
if i >= size {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
idList[i] = NewID(id)
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
return idList
|
||||||
|
}
|
||||||
|
|
||||||
// Equals returns true if the sets contain the same elements
|
// Equals returns true if the sets contain the same elements
|
||||||
func (ids Set) Equals(oIDs Set) bool {
|
func (ids Set) Equals(oIDs Set) bool {
|
||||||
if ids.Len() != oIDs.Len() {
|
if ids.Len() != oIDs.Len() {
|
||||||
|
|
|
@ -55,3 +55,46 @@ func TestSet(t *testing.T) {
|
||||||
t.Fatalf("Sets overlap")
|
t.Fatalf("Sets overlap")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestSetCappedList(t *testing.T) {
|
||||||
|
set := Set{}
|
||||||
|
|
||||||
|
id := Empty
|
||||||
|
|
||||||
|
if list := set.CappedList(0); len(list) != 0 {
|
||||||
|
t.Fatalf("List should have been empty but was %v", list)
|
||||||
|
}
|
||||||
|
|
||||||
|
set.Add(id)
|
||||||
|
|
||||||
|
if list := set.CappedList(0); len(list) != 0 {
|
||||||
|
t.Fatalf("List should have been empty but was %v", list)
|
||||||
|
} else if list := set.CappedList(1); len(list) != 1 {
|
||||||
|
t.Fatalf("List should have had length %d but had %d", 1, len(list))
|
||||||
|
} else if returnedID := list[0]; !id.Equals(returnedID) {
|
||||||
|
t.Fatalf("List should have been %s but was %s", id, returnedID)
|
||||||
|
} else if list := set.CappedList(2); len(list) != 1 {
|
||||||
|
t.Fatalf("List should have had length %d but had %d", 1, len(list))
|
||||||
|
} else if returnedID := list[0]; !id.Equals(returnedID) {
|
||||||
|
t.Fatalf("List should have been %s but was %s", id, returnedID)
|
||||||
|
}
|
||||||
|
|
||||||
|
id2 := NewID([32]byte{1})
|
||||||
|
set.Add(id2)
|
||||||
|
|
||||||
|
if list := set.CappedList(0); len(list) != 0 {
|
||||||
|
t.Fatalf("List should have been empty but was %v", list)
|
||||||
|
} else if list := set.CappedList(1); len(list) != 1 {
|
||||||
|
t.Fatalf("List should have had length %d but had %d", 1, len(list))
|
||||||
|
} else if returnedID := list[0]; !id.Equals(returnedID) && !id2.Equals(returnedID) {
|
||||||
|
t.Fatalf("List should have been %s but was %s", id, returnedID)
|
||||||
|
} else if list := set.CappedList(2); len(list) != 2 {
|
||||||
|
t.Fatalf("List should have had length %d but had %d", 2, len(list))
|
||||||
|
} else if list := set.CappedList(3); len(list) != 2 {
|
||||||
|
t.Fatalf("List should have had length %d but had %d", 2, len(list))
|
||||||
|
} else if returnedID := list[0]; !id.Equals(returnedID) && !id2.Equals(returnedID) {
|
||||||
|
t.Fatalf("list contains unexpected element %s", returnedID)
|
||||||
|
} else if returnedID := list[1]; !id.Equals(returnedID) && !id2.Equals(returnedID) {
|
||||||
|
t.Fatalf("list contains unexpected element %s", returnedID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
|
@ -42,15 +42,16 @@ type bootstrapper struct {
|
||||||
metrics
|
metrics
|
||||||
common.Bootstrapper
|
common.Bootstrapper
|
||||||
|
|
||||||
// true if all of the vertices in the original accepted frontier have been processed
|
|
||||||
processedStartingAcceptedFrontier bool
|
|
||||||
|
|
||||||
// number of vertices fetched so far
|
// number of vertices fetched so far
|
||||||
numFetched uint32
|
numFetched uint32
|
||||||
|
|
||||||
// tracks which validators were asked for which containers in which requests
|
// tracks which validators were asked for which containers in which requests
|
||||||
outstandingRequests common.Requests
|
outstandingRequests common.Requests
|
||||||
|
|
||||||
|
// IDs of vertices that we will send a GetAncestors request for once we are
|
||||||
|
// not at the max number of outstanding requests
|
||||||
|
needToFetch ids.Set
|
||||||
|
|
||||||
// Contains IDs of vertices that have recently been processed
|
// Contains IDs of vertices that have recently been processed
|
||||||
processedCache *cache.LRU
|
processedCache *cache.LRU
|
||||||
|
|
||||||
|
@ -103,19 +104,22 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
|
||||||
return acceptedVtxIDs
|
return acceptedVtxIDs
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get vertex [vtxID] and its ancestors
|
// Fetch vertices and their ancestors from the set of vertices that are needed
|
||||||
func (b *bootstrapper) fetch(vtxID ids.ID) error {
|
// to be fetched.
|
||||||
// Make sure we haven't already requested this block
|
func (b *bootstrapper) fetch(vtxIDs ...ids.ID) error {
|
||||||
|
b.needToFetch.Add(vtxIDs...)
|
||||||
|
for b.needToFetch.Len() > 0 && b.outstandingRequests.Len() < common.MaxOutstandingRequests {
|
||||||
|
vtxID := b.needToFetch.CappedList(1)[0]
|
||||||
|
b.needToFetch.Remove(vtxID)
|
||||||
|
|
||||||
|
// Make sure we haven't already requested this vertex
|
||||||
if b.outstandingRequests.Contains(vtxID) {
|
if b.outstandingRequests.Contains(vtxID) {
|
||||||
return nil
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// Make sure we don't already have this vertex
|
// Make sure we don't already have this vertex
|
||||||
if _, err := b.State.GetVertex(vtxID); err == nil {
|
if _, err := b.State.GetVertex(vtxID); err == nil {
|
||||||
if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier {
|
continue
|
||||||
return b.finish()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
|
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
|
||||||
|
@ -126,8 +130,10 @@ func (b *bootstrapper) fetch(vtxID ids.ID) error {
|
||||||
b.RequestID++
|
b.RequestID++
|
||||||
|
|
||||||
b.outstandingRequests.Add(validatorID, b.RequestID, vtxID)
|
b.outstandingRequests.Add(validatorID, b.RequestID, vtxID)
|
||||||
|
b.needToFetch.Remove(vtxID) // maintains invariant that intersection with outstandingRequests is empty
|
||||||
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors
|
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors
|
||||||
return nil
|
}
|
||||||
|
return b.finish()
|
||||||
}
|
}
|
||||||
|
|
||||||
// Process vertices
|
// Process vertices
|
||||||
|
@ -141,14 +147,17 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error {
|
||||||
|
|
||||||
for toProcess.Len() > 0 {
|
for toProcess.Len() > 0 {
|
||||||
vtx := toProcess.Pop()
|
vtx := toProcess.Pop()
|
||||||
|
vtxID := vtx.ID()
|
||||||
|
|
||||||
switch vtx.Status() {
|
switch vtx.Status() {
|
||||||
case choices.Unknown:
|
case choices.Unknown:
|
||||||
if err := b.fetch(vtx.ID()); err != nil {
|
b.needToFetch.Add(vtxID)
|
||||||
return err
|
|
||||||
}
|
|
||||||
case choices.Rejected:
|
case choices.Rejected:
|
||||||
|
b.needToFetch.Remove(vtxID)
|
||||||
return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID())
|
return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID())
|
||||||
case choices.Processing:
|
case choices.Processing:
|
||||||
|
b.needToFetch.Remove(vtxID)
|
||||||
|
|
||||||
if err := b.VtxBlocked.Push(&vertexJob{
|
if err := b.VtxBlocked.Push(&vertexJob{
|
||||||
log: b.BootstrapConfig.Context.Log,
|
log: b.BootstrapConfig.Context.Log,
|
||||||
numAccepted: b.numBSVtx,
|
numAccepted: b.numBSVtx,
|
||||||
|
@ -193,10 +202,7 @@ func (b *bootstrapper) process(vtxs ...avalanche.Vertex) error {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier {
|
return b.fetch()
|
||||||
return b.finish()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
|
// MultiPut handles the receipt of multiple containers. Should be received in response to a GetAncestors message to [vdr]
|
||||||
|
@ -236,6 +242,7 @@ func (b *bootstrapper) MultiPut(vdr ids.ShortID, requestID uint32, vtxs [][]byte
|
||||||
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
|
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
|
||||||
} else {
|
} else {
|
||||||
processVertices = append(processVertices, vtx)
|
processVertices = append(processVertices, vtx)
|
||||||
|
b.needToFetch.Remove(vtx.ID()) // No need to fetch this vertex since we have it now
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -264,24 +271,16 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
|
||||||
for _, vtxID := range acceptedContainerIDs.List() {
|
for _, vtxID := range acceptedContainerIDs.List() {
|
||||||
if vtx, err := b.State.GetVertex(vtxID); err == nil {
|
if vtx, err := b.State.GetVertex(vtxID); err == nil {
|
||||||
storedVtxs = append(storedVtxs, vtx)
|
storedVtxs = append(storedVtxs, vtx)
|
||||||
} else if err := b.fetch(vtxID); err != nil {
|
} else {
|
||||||
return err
|
b.needToFetch.Add(vtxID)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if err := b.process(storedVtxs...); err != nil {
|
return b.process(storedVtxs...)
|
||||||
return err
|
|
||||||
}
|
|
||||||
b.processedStartingAcceptedFrontier = true
|
|
||||||
|
|
||||||
if numPending := b.outstandingRequests.Len(); numPending == 0 {
|
|
||||||
return b.finish()
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finish bootstrapping
|
// Finish bootstrapping
|
||||||
func (b *bootstrapper) finish() error {
|
func (b *bootstrapper) finish() error {
|
||||||
if b.finished {
|
if b.finished || b.outstandingRequests.Len() > 0 || b.needToFetch.Len() > 0 {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing transaction state transitions...")
|
b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing transaction state transitions...")
|
||||||
|
|
|
@ -17,11 +17,14 @@ const (
|
||||||
|
|
||||||
// StatusUpdateFrequency ... bootstrapper logs "processed X blocks/vertices" every [statusUpdateFrequency] blocks/vertices
|
// StatusUpdateFrequency ... bootstrapper logs "processed X blocks/vertices" every [statusUpdateFrequency] blocks/vertices
|
||||||
StatusUpdateFrequency = 2500
|
StatusUpdateFrequency = 2500
|
||||||
|
|
||||||
|
// MaxOutstandingRequests is the maximum number of GetAncestors sent but not responsded to/failed
|
||||||
|
MaxOutstandingRequests = 8
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// MaxTimeFetchingAncestors is the maximum amount of time to spend fetching vertices during a call to GetAncestors
|
// MaxTimeFetchingAncestors is the maximum amount of time to spend fetching vertices during a call to GetAncestors
|
||||||
MaxTimeFetchingAncestors = 100 * time.Millisecond
|
MaxTimeFetchingAncestors = 50 * time.Millisecond
|
||||||
)
|
)
|
||||||
|
|
||||||
// Bootstrapper implements the Engine interface.
|
// Bootstrapper implements the Engine interface.
|
||||||
|
|
Loading…
Reference in New Issue