Improved X-chain bootstrapping

This commit is contained in:
StephenButtolph 2020-06-08 03:10:14 -04:00
parent f29bee1944
commit 921cca2ce1
7 changed files with 77 additions and 42 deletions

View File

@ -155,9 +155,6 @@ func (b *bootstrapper) process(vtx avalanche.Vertex) error {
} else {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked: %s", err)
}
if err := b.VtxBlocked.Commit(); err != nil {
return err
}
for _, tx := range vtx.Txs() {
if err := b.TxBlocked.Push(&txJob{
log: b.BootstrapConfig.Context.Log,
@ -170,15 +167,20 @@ func (b *bootstrapper) process(vtx avalanche.Vertex) error {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to txBlocked: %s", err)
}
}
if err := b.TxBlocked.Commit(); err != nil {
return err
}
for _, parent := range vtx.Parents() {
toProcess = append(toProcess, parent)
}
b.processedCache.Put(vtx.ID(), nil)
}
}
if err := b.VtxBlocked.Commit(); err != nil {
return err
}
if err := b.TxBlocked.Commit(); err != nil {
return err
}
if numPending := b.outstandingRequests.Len(); numPending == 0 && b.processedStartingAcceptedFrontier {
return b.finish()
}
@ -263,11 +265,14 @@ func (b *bootstrapper) finish() error {
if b.finished {
return nil
}
b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing state transitions...")
b.BootstrapConfig.Context.Log.Info("finished fetching vertices. executing transaction state transitions...")
if err := b.executeAll(b.TxBlocked, b.numBSBlockedTx); err != nil {
return err
}
b.BootstrapConfig.Context.Log.Info("executing vertex state transitions...")
if err := b.executeAll(b.VtxBlocked, b.numBSBlockedVtx); err != nil {
return err
}
@ -286,6 +291,7 @@ func (b *bootstrapper) finish() error {
}
func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge) error {
numExecuted := 0
for job, err := jobs.Pop(); err == nil; job, err = jobs.Pop() {
numBlocked.Dec()
b.BootstrapConfig.Context.Log.Debug("Executing: %s", job.ID())
@ -296,6 +302,10 @@ func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge)
if err := jobs.Commit(); err != nil {
return err
}
numExecuted++
if numExecuted%common.StatusUpdateFrequency == 0 { // Periodically print progress
b.BootstrapConfig.Context.Log.Info("executed %d operations", numExecuted)
}
}
return nil
}

View File

@ -85,10 +85,13 @@ func (j *Jobs) Execute(job Job) error {
jobID := job.ID()
blocking, _ := j.state.Blocking(j.db, jobID)
j.state.DeleteBlocking(j.db, jobID)
blocking, err := j.state.Blocking(j.db, jobID)
if err != nil {
return err
}
j.state.DeleteBlocking(j.db, jobID, blocking)
for _, blockedID := range blocking.List() {
for _, blockedID := range blocking {
job, err := j.state.Job(j.db, blockedID)
if err != nil {
return err
@ -142,9 +145,7 @@ func (j *Jobs) block(job Job, deps ids.Set) error {
jobID := job.ID()
for _, depID := range deps.List() {
blocking, _ := j.state.Blocking(j.db, depID)
blocking.Add(jobID)
if err := j.state.SetBlocking(j.db, depID, blocking); err != nil {
if err := j.state.AddBlocking(j.db, depID, jobID); err != nil {
return err
}
}

View File

@ -95,25 +95,31 @@ func (ps *prefixedState) Job(db database.Database, id ids.ID) (Job, error) {
return ps.state.Job(db, p.Bytes)
}
func (ps *prefixedState) SetBlocking(db database.Database, id ids.ID, blocking ids.Set) error {
func (ps *prefixedState) AddBlocking(db database.Database, id ids.ID, blocking ids.ID) error {
p := wrappers.Packer{Bytes: make([]byte, 1+hashing.HashLen)}
p.PackByte(blockingID)
p.PackFixedBytes(id.Bytes())
return ps.state.SetIDs(db, p.Bytes, blocking)
return ps.state.AddID(db, p.Bytes, blocking)
}
func (ps *prefixedState) DeleteBlocking(db database.Database, id ids.ID) error {
func (ps *prefixedState) DeleteBlocking(db database.Database, id ids.ID, blocking []ids.ID) error {
p := wrappers.Packer{Bytes: make([]byte, 1+hashing.HashLen)}
p.PackByte(blockingID)
p.PackFixedBytes(id.Bytes())
return db.Delete(p.Bytes)
for _, blocked := range blocking {
if err := ps.state.RemoveID(db, p.Bytes, blocked); err != nil {
return err
}
}
return nil
}
func (ps *prefixedState) Blocking(db database.Database, id ids.ID) (ids.Set, error) {
func (ps *prefixedState) Blocking(db database.Database, id ids.ID) ([]ids.ID, error) {
p := wrappers.Packer{Bytes: make([]byte, 1+hashing.HashLen)}
p.PackByte(blockingID)

View File

@ -4,12 +4,18 @@
package queue
import (
"errors"
"github.com/ava-labs/gecko/database"
"github.com/ava-labs/gecko/database/prefixdb"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/utils/wrappers"
)
var (
errZeroID = errors.New("zero id")
)
type state struct{ jobs *Jobs }
func (s *state) SetInt(db database.Database, key []byte, size uint32) error {
@ -42,30 +48,37 @@ func (s *state) Job(db database.Database, key []byte) (Job, error) {
return s.jobs.parser.Parse(value)
}
func (s *state) SetIDs(db database.Database, key []byte, blocking ids.Set) error {
p := wrappers.Packer{Bytes: make([]byte, wrappers.IntLen+hashing.HashLen*blocking.Len())}
// IDs returns a slice of IDs from storage
func (s *state) IDs(db database.Database, prefix []byte) ([]ids.ID, error) {
idSlice := []ids.ID(nil)
iter := prefixdb.NewNested(prefix, db).NewIterator()
defer iter.Release()
p.PackInt(uint32(blocking.Len()))
for _, id := range blocking.List() {
p.PackFixedBytes(id.Bytes())
for iter.Next() {
keyID, err := ids.ToID(iter.Key())
if err != nil {
return nil, err
}
idSlice = append(idSlice, keyID)
}
return db.Put(key, p.Bytes)
return idSlice, nil
}
func (s *state) IDs(db database.Database, key []byte) (ids.Set, error) {
bytes, err := db.Get(key)
if err != nil {
return nil, err
// AddID saves an ID to the prefixed database
func (s *state) AddID(db database.Database, prefix []byte, key ids.ID) error {
if key.IsZero() {
return errZeroID
}
p := wrappers.Packer{Bytes: bytes}
blocking := ids.Set{}
for i := p.UnpackInt(); i > 0 && !p.Errored(); i-- {
id, _ := ids.ToID(p.UnpackFixedBytes(hashing.HashLen))
blocking.Add(id)
}
return blocking, p.Err
pdb := prefixdb.NewNested(prefix, db)
return pdb.Put(key.Bytes(), nil)
}
// RemoveID removes an ID from the prefixed database
func (s *state) RemoveID(db database.Database, prefix []byte, key ids.ID) error {
if key.IsZero() {
return errZeroID
}
pdb := prefixdb.NewNested(prefix, db)
return pdb.Delete(key.Bytes())
}

View File

@ -248,6 +248,7 @@ func (b *bootstrapper) finish() error {
}
func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge) error {
numExecuted := 0
for job, err := jobs.Pop(); err == nil; job, err = jobs.Pop() {
numBlocked.Dec()
if err := jobs.Execute(job); err != nil {
@ -256,6 +257,10 @@ func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge)
if err := jobs.Commit(); err != nil {
return err
}
numExecuted++
if numExecuted%common.StatusUpdateFrequency == 0 { // Periodically print progress
b.BootstrapConfig.Context.Log.Info("executed %d blocks", numExecuted)
}
}
return nil
}

View File

@ -61,8 +61,8 @@ func (tx *UniqueTx) refresh() {
// intermediate object whose state I must reflect
if status, err := tx.vm.state.Status(tx.ID()); err == nil {
tx.status = status
tx.unique = true
}
tx.unique = true
} else {
// If someone is in the cache, they must be up to date

View File

@ -35,7 +35,7 @@ const (
batchSize = 30
stateCacheSize = 10000
idCacheSize = 10000
txCacheSize = 10000
txCacheSize = 1000000
addressSep = "-"
)