Removed memory leak in bootstrapping

This commit is contained in:
StephenButtolph 2020-05-29 15:03:00 -04:00
parent 312e17bfb3
commit bdf9f27f7d
3 changed files with 48 additions and 18 deletions

View File

@ -4,6 +4,8 @@
package avalanche
import (
"fmt"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
@ -89,7 +91,9 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
// ForceAccepted ...
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
for _, vtxID := range acceptedContainerIDs.List() {
b.fetch(vtxID)
if err := b.fetch(vtxID); err != nil {
return err
}
}
if numPending := b.pending.Len(); numPending == 0 {
@ -135,17 +139,17 @@ func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
return nil
}
func (b *bootstrapper) fetch(vtxID ids.ID) {
func (b *bootstrapper) fetch(vtxID ids.ID) error {
if b.pending.Contains(vtxID) {
return
return nil
}
vtx, err := b.State.GetVertex(vtxID)
if err != nil {
b.sendRequest(vtxID)
return
return nil
}
b.storeVertex(vtx)
return b.storeVertex(vtx)
}
func (b *bootstrapper) sendRequest(vtxID ids.ID) {
@ -167,7 +171,9 @@ func (b *bootstrapper) sendRequest(vtxID ids.ID) {
}
func (b *bootstrapper) addVertex(vtx avalanche.Vertex) error {
b.storeVertex(vtx)
if err := b.storeVertex(vtx); err != nil {
return err
}
if numPending := b.pending.Len(); numPending == 0 {
return b.finish()
@ -175,7 +181,7 @@ func (b *bootstrapper) addVertex(vtx avalanche.Vertex) error {
return nil
}
func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) error {
vts := []avalanche.Vertex{vtx}
b.numFetched++
if b.numFetched%2500 == 0 { // perioidcally inform user of progress
@ -204,6 +210,9 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
} else {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked")
}
if err := b.VtxBlocked.Commit(); err != nil {
return err
}
for _, tx := range vtx.Txs() {
if err := b.TxBlocked.Push(&txJob{
log: b.BootstrapConfig.Context.Log,
@ -216,6 +225,9 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to txBlocked")
}
}
if err := b.TxBlocked.Commit(); err != nil {
return err
}
for _, parent := range vtx.Parents() {
if parentID := parent.ID(); !b.seen.Contains(parentID) {
b.seen.Add(parentID)
@ -223,14 +235,15 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
}
}
case choices.Accepted:
b.BootstrapConfig.Context.Log.Verbo("Bootstrapping confirmed %s", vtxID)
b.BootstrapConfig.Context.Log.Verbo("bootstrapping confirmed %s", vtxID)
case choices.Rejected:
b.BootstrapConfig.Context.Log.Error("Bootstrapping wants to accept %s, however it was previously rejected", vtxID)
return fmt.Errorf("bootstrapping wants to accept %s, however it was previously rejected", vtxID)
}
}
numPending := b.pending.Len()
b.numBSPendingRequests.Set(float64(numPending))
return nil
}
func (b *bootstrapper) finish() error {
@ -263,6 +276,9 @@ func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge)
b.BootstrapConfig.Context.Log.Error("Error executing: %s", err)
return err
}
if err := jobs.Commit(); err != nil {
return err
}
}
return nil
}

View File

@ -4,6 +4,8 @@
package snowman
import (
"fmt"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/snowman"
@ -75,7 +77,9 @@ func (b *bootstrapper) FilterAccepted(containerIDs ids.Set) ids.Set {
// ForceAccepted ...
func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) error {
for _, blkID := range acceptedContainerIDs.List() {
b.fetch(blkID)
if err := b.fetch(blkID); err != nil {
return err
}
}
if numPending := b.pending.Len(); numPending == 0 {
@ -124,17 +128,17 @@ func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) error {
return nil
}
func (b *bootstrapper) fetch(blkID ids.ID) {
func (b *bootstrapper) fetch(blkID ids.ID) error {
if b.pending.Contains(blkID) {
return
return nil
}
blk, err := b.VM.GetBlock(blkID)
if err != nil {
b.sendRequest(blkID)
return
return nil
}
b.storeBlock(blk)
return b.storeBlock(blk)
}
func (b *bootstrapper) sendRequest(blkID ids.ID) {
@ -156,7 +160,9 @@ func (b *bootstrapper) sendRequest(blkID ids.ID) {
}
func (b *bootstrapper) addBlock(blk snowman.Block) error {
b.storeBlock(blk)
if err := b.storeBlock(blk); err != nil {
return err
}
if numPending := b.pending.Len(); numPending == 0 {
return b.finish()
@ -164,7 +170,7 @@ func (b *bootstrapper) addBlock(blk snowman.Block) error {
return nil
}
func (b *bootstrapper) storeBlock(blk snowman.Block) {
func (b *bootstrapper) storeBlock(blk snowman.Block) error {
status := blk.Status()
blkID := blk.ID()
for status == choices.Processing {
@ -178,6 +184,10 @@ func (b *bootstrapper) storeBlock(blk snowman.Block) {
b.numBlocked.Inc()
}
if err := b.Blocked.Commit(); err != nil {
return err
}
blk = blk.Parent()
status = blk.Status()
blkID = blk.ID()
@ -189,11 +199,12 @@ func (b *bootstrapper) storeBlock(blk snowman.Block) {
case choices.Accepted:
b.BootstrapConfig.Context.Log.Verbo("Bootstrapping confirmed %s", blkID)
case choices.Rejected:
b.BootstrapConfig.Context.Log.Error("Bootstrapping wants to accept %s, however it was previously rejected", blkID)
return fmt.Errorf("bootstrapping wants to accept %s, however it was previously rejected", blkID)
}
numPending := b.pending.Len()
b.numPendingRequests.Set(float64(numPending))
return nil
}
func (b *bootstrapper) finish() error {
@ -223,6 +234,9 @@ func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge)
if err := jobs.Execute(job); err != nil {
return err
}
if err := jobs.Commit(); err != nil {
return err
}
}
return nil
}

View File

@ -120,7 +120,7 @@ func (h *Handler) dispatchMsg(msg message) bool {
}
if err != nil {
ctx.Log.Error("fatal error occurred on chain %s, forcing a shutdown due to %s", err)
ctx.Log.Fatal("forcing chain to shutdown due to %s", err)
}
return done || err != nil
}