Merged bootstrapping error handling

This commit is contained in:
StephenButtolph 2020-05-26 13:57:42 -04:00
commit e57219a648
11 changed files with 230 additions and 60 deletions

View File

@ -50,14 +50,16 @@ func (b *bootstrapper) Initialize(config BootstrapConfig) {
b.BootstrapConfig = config
b.VtxBlocked.SetParser(&vtxParser{
numAccepted: b.numBootstrappedVtx,
numDropped: b.numDroppedVtx,
log: config.Context.Log,
numAccepted: b.numBSVtx,
numDropped: b.numBSDroppedVtx,
state: b.State,
})
b.TxBlocked.SetParser(&txParser{
numAccepted: b.numBootstrappedTx,
numDropped: b.numDroppedTx,
log: config.Context.Log,
numAccepted: b.numBSTx,
numDropped: b.numBSDroppedTx,
vm: b.VM,
})
@ -160,7 +162,7 @@ func (b *bootstrapper) sendRequest(vtxID ids.ID) {
b.pending.Add(vtxID)
b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, vtxID)
b.numPendingRequests.Set(float64(b.pending.Len()))
b.numBSPendingRequests.Set(float64(b.pending.Len()))
}
func (b *bootstrapper) addVertex(vtx avalanche.Vertex) {
@ -191,21 +193,23 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
b.pending.Remove(vtxID)
if err := b.VtxBlocked.Push(&vertexJob{
numAccepted: b.numBootstrappedVtx,
numDropped: b.numDroppedVtx,
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSVtx,
numDropped: b.numBSDroppedVtx,
vtx: vtx,
}); err == nil {
b.numBlockedVtx.Inc()
b.numBSBlockedVtx.Inc()
} else {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to vtxBlocked")
}
for _, tx := range vtx.Txs() {
if err := b.TxBlocked.Push(&txJob{
numAccepted: b.numBootstrappedVtx,
numDropped: b.numDroppedVtx,
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSTx,
numDropped: b.numBSDroppedTx,
tx: tx,
}); err == nil {
b.numBlockedTx.Inc()
b.numBSBlockedTx.Inc()
} else {
b.BootstrapConfig.Context.Log.Verbo("couldn't push to txBlocked")
}
@ -224,7 +228,7 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
}
numPending := b.pending.Len()
b.numPendingRequests.Set(float64(numPending))
b.numBSPendingRequests.Set(float64(numPending))
}
func (b *bootstrapper) finish() {
@ -233,8 +237,8 @@ func (b *bootstrapper) finish() {
}
b.BootstrapConfig.Context.Log.Info("bootstrapping finished fetching vertices. executing state transitions...")
b.executeAll(b.TxBlocked, b.numBlockedTx)
b.executeAll(b.VtxBlocked, b.numBlockedVtx)
b.executeAll(b.TxBlocked, b.numBSBlockedTx)
b.executeAll(b.VtxBlocked, b.numBSBlockedVtx)
// Start consensus
b.onFinished()

View File

@ -10,52 +10,52 @@ import (
)
type metrics struct {
numPendingRequests, numBlockedVtx, numBlockedTx prometheus.Gauge
numBootstrappedVtx, numDroppedVtx,
numBootstrappedTx, numDroppedTx prometheus.Counter
numBSPendingRequests, numBSBlockedVtx, numBSBlockedTx prometheus.Gauge
numBSVtx, numBSDroppedVtx,
numBSTx, numBSDroppedTx prometheus.Counter
numPolls, numVtxRequests, numTxRequests, numPendingVtx prometheus.Gauge
}
// Initialize implements the Engine interface
func (m *metrics) Initialize(log logging.Logger, namespace string, registerer prometheus.Registerer) {
m.numPendingRequests = prometheus.NewGauge(
m.numBSPendingRequests = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "av_bs_vtx_requests",
Help: "Number of pending bootstrap vertex requests",
})
m.numBlockedVtx = prometheus.NewGauge(
m.numBSBlockedVtx = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "av_bs_blocked_vts",
Help: "Number of blocked bootstrap vertices",
})
m.numBlockedTx = prometheus.NewGauge(
m.numBSBlockedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "av_bs_blocked_txs",
Help: "Number of blocked bootstrap txs",
})
m.numBootstrappedVtx = prometheus.NewCounter(
m.numBSVtx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_accepted_vts",
Help: "Number of accepted vertices",
})
m.numDroppedVtx = prometheus.NewCounter(
m.numBSDroppedVtx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_dropped_vts",
Help: "Number of dropped vertices",
})
m.numBootstrappedTx = prometheus.NewCounter(
m.numBSTx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_accepted_txs",
Help: "Number of accepted txs",
})
m.numDroppedTx = prometheus.NewCounter(
m.numBSDroppedTx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_dropped_txs",
@ -86,25 +86,25 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
Help: "Number of blocked vertices",
})
if err := registerer.Register(m.numPendingRequests); err != nil {
if err := registerer.Register(m.numBSPendingRequests); err != nil {
log.Error("Failed to register av_bs_vtx_requests statistics due to %s", err)
}
if err := registerer.Register(m.numBlockedVtx); err != nil {
if err := registerer.Register(m.numBSBlockedVtx); err != nil {
log.Error("Failed to register av_bs_blocked_vts statistics due to %s", err)
}
if err := registerer.Register(m.numBlockedTx); err != nil {
if err := registerer.Register(m.numBSBlockedTx); err != nil {
log.Error("Failed to register av_bs_blocked_txs statistics due to %s", err)
}
if err := registerer.Register(m.numBootstrappedVtx); err != nil {
if err := registerer.Register(m.numBSVtx); err != nil {
log.Error("Failed to register av_bs_accepted_vts statistics due to %s", err)
}
if err := registerer.Register(m.numDroppedVtx); err != nil {
if err := registerer.Register(m.numBSDroppedVtx); err != nil {
log.Error("Failed to register av_bs_dropped_vts statistics due to %s", err)
}
if err := registerer.Register(m.numBootstrappedTx); err != nil {
if err := registerer.Register(m.numBSTx); err != nil {
log.Error("Failed to register av_bs_accepted_txs statistics due to %s", err)
}
if err := registerer.Register(m.numDroppedTx); err != nil {
if err := registerer.Register(m.numBSDroppedTx); err != nil {
log.Error("Failed to register av_bs_dropped_txs statistics due to %s", err)
}
if err := registerer.Register(m.numPolls); err != nil {

View File

@ -333,7 +333,7 @@ func (t *Transitive) insert(vtx avalanche.Vertex) {
// Track performance statistics
t.numVtxRequests.Set(float64(t.vtxReqs.Len()))
t.numTxRequests.Set(float64(t.missingTxs.Len()))
t.numBlockedVtx.Set(float64(t.pending.Len()))
t.numPendingVtx.Set(float64(t.pending.Len()))
}
func (t *Transitive) batch(txs []snowstorm.Tx, force, empty bool) {

View File

@ -4,15 +4,20 @@
package avalanche
import (
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
"github.com/ava-labs/gecko/snow/engine/common/queue"
"github.com/ava-labs/gecko/utils/logging"
)
type txParser struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
vm DAGVM
}
@ -23,6 +28,7 @@ func (p *txParser) Parse(txBytes []byte) (queue.Job, error) {
return nil, err
}
return &txJob{
log: p.log,
numAccepted: p.numAccepted,
numDropped: p.numDropped,
tx: tx,
@ -30,6 +36,7 @@ func (p *txParser) Parse(txBytes []byte) (queue.Job, error) {
}
type txJob struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
tx snowstorm.Tx
}
@ -44,19 +51,29 @@ func (t *txJob) MissingDependencies() ids.Set {
}
return missing
}
func (t *txJob) Execute() {
func (t *txJob) Execute() error {
if t.MissingDependencies().Len() != 0 {
t.numDropped.Inc()
return
return errors.New("attempting to accept a transaction with missing dependencies")
}
switch t.tx.Status() {
status := t.tx.Status()
switch status {
case choices.Unknown, choices.Rejected:
t.numDropped.Inc()
return fmt.Errorf("attempting to execute transaction with status %s", status)
case choices.Processing:
t.tx.Verify()
t.tx.Accept()
if err := t.tx.Verify(); err != nil {
t.log.Warn("transaction %s failed verification during bootstrapping due to %s",
t.tx.ID(), err)
}
t.numAccepted.Inc()
if err := t.tx.Accept(); err != nil {
return fmt.Errorf("failed to accept transaction in bootstrapping: %w", err)
}
}
return nil
}
func (t *txJob) Bytes() []byte { return t.tx.Bytes() }

View File

@ -4,15 +4,20 @@
package avalanche
import (
"errors"
"fmt"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
"github.com/ava-labs/gecko/snow/engine/common/queue"
"github.com/ava-labs/gecko/utils/logging"
)
type vtxParser struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
state State
}
@ -23,6 +28,7 @@ func (p *vtxParser) Parse(vtxBytes []byte) (queue.Job, error) {
return nil, err
}
return &vertexJob{
log: p.log,
numAccepted: p.numAccepted,
numDropped: p.numDropped,
vtx: vtx,
@ -30,6 +36,7 @@ func (p *vtxParser) Parse(vtxBytes []byte) (queue.Job, error) {
}
type vertexJob struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
vtx avalanche.Vertex
}
@ -44,23 +51,28 @@ func (v *vertexJob) MissingDependencies() ids.Set {
}
return missing
}
func (v *vertexJob) Execute() {
func (v *vertexJob) Execute() error {
if v.MissingDependencies().Len() != 0 {
v.numDropped.Inc()
return
return errors.New("attempting to execute blocked vertex")
}
for _, tx := range v.vtx.Txs() {
if tx.Status() != choices.Accepted {
v.numDropped.Inc()
return
return errors.New("attempting to execute vertex with non-accepted transactions")
}
}
switch v.vtx.Status() {
status := v.vtx.Status()
switch status {
case choices.Unknown, choices.Rejected:
v.numDropped.Inc()
return fmt.Errorf("attempting to execute vertex with status %s", status)
case choices.Processing:
v.vtx.Accept()
v.numAccepted.Inc()
if err := v.vtx.Accept(); err != nil {
return fmt.Errorf("failed to accept vertex in bootstrapping: %w", err)
}
}
return nil
}
func (v *vertexJob) Bytes() []byte { return v.vtx.Bytes() }

View File

@ -12,7 +12,7 @@ type Job interface {
ID() ids.ID
MissingDependencies() ids.Set
Execute()
Execute() error
Bytes() []byte
}

View File

@ -79,7 +79,9 @@ func (j *Jobs) HasNext() (bool, error) {
// Execute ...
func (j *Jobs) Execute(job Job) error {
job.Execute()
if err := job.Execute(); err != nil {
return err
}
jobID := job.ID()
@ -128,6 +130,12 @@ func (j *Jobs) push(job Job) error {
}
func (j *Jobs) block(job Job, deps ids.Set) error {
if has, err := j.state.HasJob(j.db, job.ID()); err != nil {
return err
} else if has {
return errDuplicate
}
if err := j.state.SetJob(j.db, job); err != nil {
return err
}

View File

@ -5,12 +5,14 @@ package queue
import (
"bytes"
"errors"
"testing"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/ids"
)
// Test that creating a new queue can be created and that it is initially empty.
func TestNew(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -29,6 +31,8 @@ func TestNew(t *testing.T) {
}
}
// Test that a job can be added to a queue, and then the job can be removed from
// the queue after a shutdown.
func TestPushPop(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -46,7 +50,7 @@ func TestPushPop(t *testing.T) {
IDF: func() ids.ID { return id },
MissingDependenciesF: func() ids.Set { return ids.Set{} },
ExecuteF: func() {},
ExecuteF: func() error { return nil },
BytesF: func() []byte { return []byte{0} },
}
@ -94,6 +98,8 @@ func TestPushPop(t *testing.T) {
}
}
// Test that executing a job will cause a dependent job to be placed on to the
// ready queue
func TestExecute(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -112,18 +118,18 @@ func TestExecute(t *testing.T) {
IDF: func() ids.ID { return id0 },
MissingDependenciesF: func() ids.Set { return ids.Set{} },
ExecuteF: func() { *executed0 = true },
ExecuteF: func() error { *executed0 = true; return nil },
BytesF: func() []byte { return []byte{0} },
}
id1 := ids.Empty.Prefix(0)
id1 := ids.Empty.Prefix(1)
executed1 := new(bool)
job1 := &TestJob{
T: t,
IDF: func() ids.ID { return id1 },
MissingDependenciesF: func() ids.Set { return ids.Set{id0.Key(): true} },
ExecuteF: func() { *executed1 = true },
ExecuteF: func() error { *executed1 = true; return nil },
BytesF: func() []byte { return []byte{1} },
}
@ -182,7 +188,8 @@ func TestExecute(t *testing.T) {
}
}
func TestDuplicatedPush(t *testing.T) {
// Test that a job that is ready to be executed can only be added once
func TestDuplicatedExecutablePush(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -199,7 +206,7 @@ func TestDuplicatedPush(t *testing.T) {
IDF: func() ids.ID { return id },
MissingDependenciesF: func() ids.Set { return ids.Set{} },
ExecuteF: func() {},
ExecuteF: func() error { return nil },
BytesF: func() []byte { return []byte{0} },
}
@ -250,3 +257,116 @@ func TestDuplicatedPush(t *testing.T) {
t.Fatalf("Shouldn't have a container ready to pop")
}
}
// Test that a job that isn't ready to be executed can only be added once
func TestDuplicatedNotExecutablePush(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
jobs, err := New(db)
if err != nil {
t.Fatal(err)
}
jobs.SetParser(parser)
id0 := ids.Empty.Prefix(0)
id1 := ids.Empty.Prefix(1)
job1 := &TestJob{
T: t,
IDF: func() ids.ID { return id1 },
MissingDependenciesF: func() ids.Set {
s := ids.Set{}
s.Add(id0)
return s
},
ExecuteF: func() error { return nil },
BytesF: func() []byte { return []byte{1} },
}
job0 := &TestJob{
T: t,
IDF: func() ids.ID { return id0 },
MissingDependenciesF: func() ids.Set { return ids.Set{} },
ExecuteF: func() error {
job1.MissingDependenciesF = func() ids.Set { return ids.Set{} }
return nil
},
BytesF: func() []byte { return []byte{0} },
}
if err := jobs.Push(job1); err != nil {
t.Fatal(err)
}
if err := jobs.Push(job1); err == nil {
t.Fatalf("should have errored on pushing a duplicate job")
}
if err := jobs.Commit(); err != nil {
t.Fatal(err)
}
jobs, err = New(db)
if err != nil {
t.Fatal(err)
}
jobs.SetParser(parser)
if err := jobs.Push(job0); err != nil {
t.Fatal(err)
}
if hasNext, err := jobs.HasNext(); err != nil {
t.Fatal(err)
} else if !hasNext {
t.Fatalf("Should have a container ready to pop")
}
parser.ParseF = func(b []byte) (Job, error) {
if bytes.Equal(b, []byte{0}) {
return job0, nil
}
if bytes.Equal(b, []byte{1}) {
return job1, nil
}
t.Fatalf("Unknown job")
return nil, errors.New("Unknown job")
}
returnedBlockable, err := jobs.Pop()
if err != nil {
t.Fatal(err)
}
if returnedBlockable != job0 {
t.Fatalf("Returned wrong job")
}
if err := jobs.Execute(job0); err != nil {
t.Fatal(err)
}
if hasNext, err := jobs.HasNext(); err != nil {
t.Fatal(err)
} else if !hasNext {
t.Fatalf("Should have a container ready to pop")
}
returnedBlockable, err = jobs.Pop()
if err != nil {
t.Fatal(err)
}
if returnedBlockable != job1 {
t.Fatalf("Returned wrong job")
}
if hasNext, err := jobs.HasNext(); err != nil {
t.Fatal(err)
} else if hasNext {
t.Fatalf("Shouldn't have a container ready to pop")
}
}

View File

@ -4,6 +4,7 @@
package queue
import (
"errors"
"testing"
"github.com/ava-labs/gecko/ids"
@ -20,7 +21,7 @@ type TestJob struct {
IDF func() ids.ID
MissingDependenciesF func() ids.Set
ExecuteF func()
ExecuteF func() error
BytesF func() []byte
}
@ -55,12 +56,13 @@ func (j *TestJob) MissingDependencies() ids.Set {
}
// Execute ...
func (j *TestJob) Execute() {
func (j *TestJob) Execute() error {
if j.ExecuteF != nil {
j.ExecuteF()
return j.ExecuteF()
} else if j.CantExecute && j.T != nil {
j.T.Fatalf("Unexpectedly called Execute")
}
return errors.New("Unexpectedly called Execute")
}
// Bytes ...

View File

@ -5,6 +5,7 @@ package avm
import (
"errors"
"fmt"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
@ -100,13 +101,13 @@ func (tx *UniqueTx) ID() ids.ID { return tx.txID }
// Accept is called when the transaction was finalized as accepted by consensus
func (tx *UniqueTx) Accept() error {
defer tx.vm.db.Abort()
if err := tx.setStatus(choices.Accepted); err != nil {
tx.vm.ctx.Log.Error("Failed to accept tx %s due to %s", tx.txID, err)
return err
if s := tx.Status(); s != choices.Processing {
tx.vm.ctx.Log.Error("Failed to accept tx %s because the tx is in state %s", tx.txID, s)
return fmt.Errorf("transaction has invalid status: %s", s)
}
defer tx.vm.db.Abort()
// Remove spent utxos
for _, utxo := range tx.InputUTXOs() {
if utxo.Symbolic() {
@ -123,11 +124,16 @@ func (tx *UniqueTx) Accept() error {
// Add new utxos
for _, utxo := range tx.UTXOs() {
if err := tx.vm.state.FundUTXO(utxo); err != nil {
tx.vm.ctx.Log.Error("Failed to fund utxo %s due to %s", utxoID, err)
tx.vm.ctx.Log.Error("Failed to fund utxo %s due to %s", utxo.InputID(), err)
return err
}
}
if err := tx.setStatus(choices.Accepted); err != nil {
tx.vm.ctx.Log.Error("Failed to accept tx %s due to %s", tx.txID, err)
return err
}
txID := tx.ID()
commitBatch, err := tx.vm.db.CommitBatch()
if err != nil {

View File

@ -5,6 +5,7 @@ package ava
import (
"errors"
"fmt"
"github.com/ava-labs/gecko/cache"
"github.com/ava-labs/gecko/database"
@ -146,7 +147,7 @@ func (s *State) SetIDs(id ids.ID, idSlice []ids.ID) error {
bytes, err := s.Codec.Marshal(idSlice)
if err != nil {
return err
return fmt.Errorf("failed to marshal an ID array due to %w", err)
}
s.Cache.Put(id, idSlice)