mirror of https://github.com/poanetwork/gecko.git
revert to single-threaded model because multi-threaded model isn't safe
This commit is contained in:
parent
91771359ae
commit
e783af7fa7
|
@ -6,7 +6,6 @@ package avalanche
|
|||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"sync"
|
||||
|
||||
"github.com/ava-labs/gecko/cache"
|
||||
"github.com/ava-labs/gecko/ids"
|
||||
|
@ -19,8 +18,7 @@ import (
|
|||
)
|
||||
|
||||
const (
|
||||
chanSize = 1000
|
||||
cacheSize = 2000
|
||||
cacheSize = 3000
|
||||
)
|
||||
|
||||
// BootstrapConfig ...
|
||||
|
@ -40,29 +38,10 @@ type bootstrapper struct {
|
|||
metrics
|
||||
common.Bootstrapper
|
||||
|
||||
numProcessed uint32 // TODO remove
|
||||
numProcessed uint32
|
||||
|
||||
// outstandingRequests tracks which validators were asked for which containers in which requests
|
||||
outstandingRequests common.Requests
|
||||
outstandingRequestsLock sync.Mutex
|
||||
|
||||
// Incremented before an element is put into needed
|
||||
// Decremented at end of iteration of fetch()
|
||||
// --> If wg is 0, needed is empty and thread isn't in iteration of fetch()
|
||||
// Incremented before an element is put into toProcess
|
||||
// Decremented at end of iteration of process()
|
||||
// --> If wg is 0, toProcess is empty and thread isn't in iteration of process()
|
||||
// Incremented before an element is added to outstandingRequests
|
||||
// Decremented at end of function where an element is removed from outstandingRequests
|
||||
// --> If wg is 0, there are no outstanding requests
|
||||
// Invariant: If wg is 0, bootstrapping is done
|
||||
wg sync.WaitGroup
|
||||
|
||||
// IDs of vertices that we need but don't have, and haven't sent a request for
|
||||
needed chan ids.ID
|
||||
|
||||
// Vertices waiting to be processed
|
||||
toProcess chan avalanche.Vertex
|
||||
outstandingRequests common.Requests
|
||||
|
||||
processedCache *cache.LRU
|
||||
|
||||
|
@ -75,17 +54,17 @@ type bootstrapper struct {
|
|||
// Initialize this engine.
|
||||
func (b *bootstrapper) Initialize(config BootstrapConfig) error {
|
||||
b.BootstrapConfig = config
|
||||
b.needed = make(chan ids.ID, chanSize)
|
||||
b.toProcess = make(chan avalanche.Vertex, chanSize)
|
||||
b.processedCache = &cache.LRU{Size: cacheSize}
|
||||
|
||||
b.VtxBlocked.SetParser(&vtxParser{
|
||||
log: config.Context.Log,
|
||||
numAccepted: b.numBSVtx,
|
||||
numDropped: b.numBSDroppedVtx,
|
||||
state: b.State,
|
||||
})
|
||||
|
||||
b.TxBlocked.SetParser(&txParser{
|
||||
log: config.Context.Log,
|
||||
numAccepted: b.numBSTx,
|
||||
numDropped: b.numBSDroppedTx,
|
||||
vm: b.VM,
|
||||
|
@ -114,107 +93,91 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
|
|||
return acceptedVtxIDs
|
||||
}
|
||||
|
||||
// Constantly fetch vertices we need but don't have
|
||||
func (b *bootstrapper) fetch() {
|
||||
for {
|
||||
toFetch, chanOpen := <-b.needed // take a vertex we need
|
||||
if !chanOpen { // bootstrapping is done
|
||||
return
|
||||
}
|
||||
b.BootstrapConfig.Context.Log.Debug("in fetch. toFetch: %s", toFetch) // TODO remove
|
||||
|
||||
// Make sure we don't already have this vertex
|
||||
if _, err := b.State.GetVertex(toFetch); err == nil {
|
||||
b.wg.Done() // Decremented at end of iteration of fetch()
|
||||
return
|
||||
}
|
||||
|
||||
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
|
||||
if len(validators) == 0 {
|
||||
b.BootstrapConfig.Context.Log.Error("Dropping request for %s as there are no validators", toFetch)
|
||||
b.wg.Add(1) // Incremented before an element is put into needed
|
||||
b.needed <- toFetch // TODO: What to do here? Right now this is an infinite loop...
|
||||
continue
|
||||
}
|
||||
validatorID := validators[0].ID()
|
||||
b.RequestID++
|
||||
|
||||
b.wg.Add(1) // Incremented before an element is added to outstandingRequests
|
||||
b.outstandingRequestsLock.Lock()
|
||||
b.outstandingRequests.Add(validatorID, b.RequestID, toFetch)
|
||||
b.outstandingRequestsLock.Unlock()
|
||||
b.BootstrapConfig.Context.Log.Debug("in fetch. calling GetAncestor(%s, %d, %s)", validatorID, b.RequestID, toFetch) // TODO remove
|
||||
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, toFetch) // request vertex and ancestors
|
||||
b.wg.Done() // Decremented at end of iteration of fetch()
|
||||
// Get a vertex and its ancestors
|
||||
func (b *bootstrapper) fetch(vtxID ids.ID) error {
|
||||
// Make sure we don't already have this vertex
|
||||
if _, err := b.State.GetVertex(vtxID); err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
validators := b.BootstrapConfig.Validators.Sample(1) // validator to send request to
|
||||
if len(validators) == 0 {
|
||||
return fmt.Errorf("Dropping request for %s as there are no validators", vtxID)
|
||||
}
|
||||
validatorID := validators[0].ID()
|
||||
b.RequestID++
|
||||
|
||||
b.outstandingRequests.Add(validatorID, b.RequestID, vtxID)
|
||||
b.BootstrapConfig.Sender.GetAncestors(validatorID, b.RequestID, vtxID) // request vertex and ancestors
|
||||
return nil
|
||||
}
|
||||
|
||||
// Constantly process vertices
|
||||
func (b *bootstrapper) process() {
|
||||
for {
|
||||
vtx, chanOpen := <-b.toProcess // take a vertex we haven't processed
|
||||
if !chanOpen {
|
||||
return
|
||||
// Process vertices
|
||||
func (b *bootstrapper) process(vtx avalanche.Vertex) error {
|
||||
b.numProcessed++ // Progress tracker
|
||||
if b.numProcessed%2500 == 0 {
|
||||
b.BootstrapConfig.Context.Log.Debug("processed %d vertices", b.numProcessed)
|
||||
}
|
||||
|
||||
toProcess := []avalanche.Vertex{vtx}
|
||||
for len(toProcess) > 0 {
|
||||
newLen := len(toProcess) - 1
|
||||
vtx := toProcess[newLen]
|
||||
toProcess = toProcess[:newLen]
|
||||
if _, ok := b.processedCache.Get(vtx.ID()); ok { // already processed this
|
||||
continue
|
||||
}
|
||||
|
||||
// Check if we've already processed this vtx recently
|
||||
if _, processed := b.processedCache.Get(vtx.ID()); processed {
|
||||
b.wg.Done() // Decremented at end of iteration of process()
|
||||
continue
|
||||
} else {
|
||||
switch vtx.Status() {
|
||||
case choices.Unknown:
|
||||
b.fetch(vtx.ID())
|
||||
case choices.Rejected:
|
||||
return fmt.Errorf("tried to accept %s even though it was previously rejected", vtx.ID())
|
||||
case choices.Processing:
|
||||
|
||||
if err := b.VtxBlocked.Push(&vertexJob{
|
||||
log: b.BootstrapConfig.Context.Log,
|
||||
numAccepted: b.numBSVtx,
|
||||
numDropped: b.numBSDroppedVtx,
|
||||
vtx: vtx,
|
||||
}); err == nil {
|
||||
b.numBSBlockedVtx.Inc()
|
||||
} else {
|
||||
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked: %s", err)
|
||||
}
|
||||
if err := b.VtxBlocked.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, tx := range vtx.Txs() {
|
||||
if err := b.TxBlocked.Push(&txJob{
|
||||
log: b.BootstrapConfig.Context.Log,
|
||||
numAccepted: b.numBSTx,
|
||||
numDropped: b.numBSDroppedTx,
|
||||
tx: tx,
|
||||
}); err == nil {
|
||||
b.numBSBlockedTx.Inc()
|
||||
} else {
|
||||
b.BootstrapConfig.Context.Log.Verbo("couldn't push to txBlocked: %s", err)
|
||||
}
|
||||
}
|
||||
if err := b.TxBlocked.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
for _, parent := range vtx.Parents() {
|
||||
toProcess = append(toProcess, parent)
|
||||
}
|
||||
b.processedCache.Put(vtx.ID(), vtx.ID())
|
||||
}
|
||||
|
||||
b.numProcessed++
|
||||
if b.numProcessed%1000 == 0 {
|
||||
b.BootstrapConfig.Context.Log.Info("processed %d vertices", b.numProcessed) // TODO remove
|
||||
}
|
||||
|
||||
// process it
|
||||
b.BootstrapConfig.Context.Log.Debug("in process. vtx: %s", vtx.ID()) // TODO remove
|
||||
if err := b.VtxBlocked.Push(&vertexJob{
|
||||
numAccepted: b.numBSVtx,
|
||||
numDropped: b.numBSDroppedVtx,
|
||||
vtx: vtx,
|
||||
}); err == nil {
|
||||
b.numBSBlockedVtx.Inc()
|
||||
} else {
|
||||
b.BootstrapConfig.Context.Log.Fatal("couldn't push to vtxBlocked") // TODO make Verbo
|
||||
}
|
||||
|
||||
for _, tx := range vtx.Txs() {
|
||||
if err := b.TxBlocked.Push(&txJob{
|
||||
numAccepted: b.numBSTx,
|
||||
numDropped: b.numBSDroppedTx,
|
||||
tx: tx,
|
||||
}); err == nil {
|
||||
b.numBSBlockedTx.Inc()
|
||||
} else {
|
||||
b.BootstrapConfig.Context.Log.Fatal("couldn't push to txBlocked") // TODO make Verbo
|
||||
}
|
||||
}
|
||||
|
||||
for _, parent := range vtx.Parents() {
|
||||
b.BootstrapConfig.Context.Log.Debug("parent of %s is %s", vtx.ID(), parent.ID()) // TODO remove
|
||||
if parent.Status() == choices.Unknown {
|
||||
b.BootstrapConfig.Context.Log.Debug("parent %s is unknown. Adding to needed...", parent.ID()) // TODO remove
|
||||
b.wg.Add(1) // Incremented before an element is put into needed
|
||||
b.needed <- parent.ID()
|
||||
} else if parent.Status() == choices.Processing {
|
||||
b.BootstrapConfig.Context.Log.Debug("parent %s is processing. Adding to toProcess...", parent.ID()) // TODO remove
|
||||
b.wg.Add(1) // Incremented before an element is put into toProcess
|
||||
b.toProcess <- parent
|
||||
}
|
||||
}
|
||||
|
||||
b.wg.Done() // Decremented at end of iteration of process()
|
||||
}
|
||||
if numPending := b.outstandingRequests.Len(); numPending == 0 {
|
||||
return b.finish()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Put ...
|
||||
func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
|
||||
b.BootstrapConfig.Context.Log.Verbo("in Put(%s, %d, %s)", vdr, requestID, vtxID) // TODO remove
|
||||
vtx, err := b.State.ParseVertex(vtxBytes) // Persists the vtx. vtx.Status() not Unknown.
|
||||
vtx, err := b.State.ParseVertex(vtxBytes) // Persists the vtx. vtx.Status() not Unknown.
|
||||
if err != nil {
|
||||
b.BootstrapConfig.Context.Log.Debug("Failed to parse vertex: %w", err)
|
||||
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
|
||||
|
@ -225,27 +188,21 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxB
|
|||
// The validator that sent this message said the ID of the vertex inside was [vtxID]
|
||||
// but actually it's [parsedVtxID]
|
||||
if !parsedVtxID.Equals(vtxID) {
|
||||
return b.GetFailed(vdr, requestID) // TODO is this right?
|
||||
return b.GetFailed(vdr, requestID)
|
||||
}
|
||||
|
||||
b.outstandingRequestsLock.Lock()
|
||||
expectedVtxID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
||||
b.outstandingRequestsLock.Unlock()
|
||||
|
||||
if !ok { // there was no outstanding request from this validator for a request with this ID
|
||||
if requestID != math.MaxUint32 { // request ID of math.MaxUint32 means the put was a gossip message. In that case, just return.
|
||||
b.BootstrapConfig.Context.Log.Debug("Unexpected Put. There is no outstanding request to %s with request ID %d", vdr, requestID)
|
||||
}
|
||||
// Don't call b.wg.Done() because nothing was removed from b.outstandingRequests
|
||||
return nil
|
||||
}
|
||||
|
||||
if !expectedVtxID.Equals(parsedVtxID) {
|
||||
b.BootstrapConfig.Context.Log.Debug("Put(%s, %d) contains vertex %s but should contain vertex %s.", vdr, requestID, parsedVtxID, expectedVtxID)
|
||||
b.outstandingRequestsLock.Lock()
|
||||
b.outstandingRequests.Add(vdr, requestID, expectedVtxID) // Just going to be removed by GetFailed...TODO is there a better way to do this?
|
||||
b.outstandingRequestsLock.Unlock()
|
||||
// Don't call b.wg.Done() because nothing was removed from b.outstandingRequests
|
||||
b.outstandingRequests.Add(vdr, requestID, expectedVtxID) // Just going to be removed by GetFailed
|
||||
return b.GetFailed(vdr, requestID)
|
||||
}
|
||||
|
||||
|
@ -256,17 +213,12 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxB
|
|||
return fmt.Errorf("status of vtx %s is Unknown after it was parsed", vtxID)
|
||||
}
|
||||
|
||||
b.wg.Add(1) // Incremented before an element is put into toProcess
|
||||
b.toProcess <- vtx
|
||||
b.wg.Done() // Decremented at end of function where an element is removed from outstandingRequests
|
||||
|
||||
return nil
|
||||
return b.process(vtx)
|
||||
}
|
||||
|
||||
// PutAncestor ...
|
||||
func (b *bootstrapper) PutAncestor(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) error {
|
||||
b.BootstrapConfig.Context.Log.Debug("in PutAncestor(%s, %d, %s)", vdr, requestID, vtxID) // TODO remove
|
||||
_, err := b.State.ParseVertex(vtxBytes) // Persists the vtx
|
||||
_, err := b.State.ParseVertex(vtxBytes) // Persists the vtx
|
||||
if err != nil {
|
||||
b.BootstrapConfig.Context.Log.Debug("Failed to parse vertex: %w", err)
|
||||
b.BootstrapConfig.Context.Log.Verbo("vertex: %s", formatting.DumpBytes{Bytes: vtxBytes})
|
||||
|
@ -276,76 +228,66 @@ func (b *bootstrapper) PutAncestor(vdr ids.ShortID, requestID uint32, vtxID ids.
|
|||
|
||||
// GetFailed is called when a Get message we sent fails
|
||||
func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
|
||||
b.BootstrapConfig.Context.Log.Debug("in GetFailed(%s, %d)", vdr, requestID) // TODO remove
|
||||
b.outstandingRequestsLock.Lock()
|
||||
vtxID, ok := b.outstandingRequests.Remove(vdr, requestID)
|
||||
b.outstandingRequestsLock.Unlock()
|
||||
if !ok {
|
||||
b.BootstrapConfig.Context.Log.Verbo("GetFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
|
||||
b.BootstrapConfig.Context.Log.Debug("GetFailed(%s, %d) called but there was no outstanding request to this validator with this ID", vdr, requestID)
|
||||
return nil
|
||||
}
|
||||
// Send another request for this
|
||||
b.wg.Add(1) // Incremented before an element is put into needed
|
||||
b.needed <- vtxID
|
||||
b.wg.Done() // Decremented at end of function where an element is removed from outstandingRequests
|
||||
return nil
|
||||
return b.fetch(vtxID)
|
||||
}
|
||||
|
||||
// ForceAccepted ...
|
||||
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
|
||||
b.BootstrapConfig.Context.Log.Debug("in forceAccepted") // TODO remove
|
||||
if acceptedContainerIDs.Len() == 0 {
|
||||
b.finish()
|
||||
return nil
|
||||
}
|
||||
|
||||
for _, vtxID := range acceptedContainerIDs.List() {
|
||||
b.BootstrapConfig.Context.Log.Debug("in forceAccepted. vtxID: %s", vtxID) // TODO remove
|
||||
vtx, err := b.State.GetVertex(vtxID)
|
||||
if err != nil || vtx.Status() == choices.Unknown {
|
||||
b.BootstrapConfig.Context.Log.Debug("in forceAccepted. adding %s to needed", vtxID) // TODO remove
|
||||
b.wg.Add(1) // Incremented before an element is put into needed
|
||||
b.needed <- vtxID
|
||||
} else if vtx.Status() == choices.Processing {
|
||||
b.BootstrapConfig.Context.Log.Debug("in forceAccepted. adding: %s to toProcess", vtx.ID()) // TODO remove
|
||||
b.wg.Add(1) // Incremented before an element is put into toProcess
|
||||
b.toProcess <- vtx
|
||||
if vtx, err := b.State.GetVertex(vtxID); err == nil {
|
||||
b.process(vtx)
|
||||
} else if err := b.fetch(vtxID); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// TODO start threads
|
||||
go b.fetch()
|
||||
go b.process()
|
||||
go func() {
|
||||
b.wg.Wait() // wait until bootstrapping is done
|
||||
b.finish()
|
||||
}()
|
||||
if numPending := b.outstandingRequests.Len(); numPending == 0 {
|
||||
// TODO: This typically indicates bootstrapping has failed, so this
|
||||
// should be handled appropriately
|
||||
return b.finish()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Finish bootstrapping
|
||||
func (b *bootstrapper) finish() {
|
||||
func (b *bootstrapper) finish() error {
|
||||
if b.finished {
|
||||
return
|
||||
return nil
|
||||
}
|
||||
b.BootstrapConfig.Context.Log.Info("bootstrapping finished fetching vertices. executing state transitions...")
|
||||
|
||||
b.executeAll(b.TxBlocked, b.numBSBlockedTx)
|
||||
b.executeAll(b.VtxBlocked, b.numBSBlockedVtx)
|
||||
if err := b.executeAll(b.TxBlocked, b.numBSBlockedTx); err != nil {
|
||||
return err
|
||||
}
|
||||
if err := b.executeAll(b.VtxBlocked, b.numBSBlockedVtx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Start consensus
|
||||
b.onFinished()
|
||||
close(b.toProcess)
|
||||
close(b.needed)
|
||||
if err := b.onFinished(); err != nil {
|
||||
return err
|
||||
}
|
||||
b.finished = true
|
||||
return nil
|
||||
}
|
||||
|
||||
func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge) {
|
||||
func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge) error {
|
||||
for job, err := jobs.Pop(); err == nil; job, err = jobs.Pop() {
|
||||
numBlocked.Dec()
|
||||
b.BootstrapConfig.Context.Log.Debug("Executing: %s", job.ID())
|
||||
if err := jobs.Execute(job); err != nil {
|
||||
b.BootstrapConfig.Context.Log.Warn("Error executing: %s", err)
|
||||
b.BootstrapConfig.Context.Log.Error("Error executing: %s", err)
|
||||
return err
|
||||
}
|
||||
if err := jobs.Commit(); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -12,7 +12,7 @@ import (
|
|||
|
||||
const (
|
||||
// AncestorsToFetch is the maximum number of ancestors that should be returned in response to a GetAncestors call
|
||||
AncestorsToFetch = 200
|
||||
AncestorsToFetch = 1000
|
||||
)
|
||||
|
||||
// Bootstrapper implements the Engine interface.
|
||||
|
|
Loading…
Reference in New Issue