debugging utxo error

This commit is contained in:
StephenButtolph 2020-05-20 11:37:01 -04:00
parent e21601b4fe
commit 896cafdcbb
11 changed files with 213 additions and 44 deletions

View File

@ -29,7 +29,7 @@ import (
)
const (
dbVersion = "v0.2.0"
dbVersion = "v0.2.-5"
)
// Results of parsing the CLI

View File

@ -43,8 +43,8 @@ borealis_bootstrap:
borealis_node:
hosts:
node1:
ansible_host: 34.207.133.167
# node1:
# ansible_host: 34.207.133.167
node2:
ansible_host: 107.23.241.199
node3:

View File

@ -43,14 +43,16 @@ func (b *bootstrapper) Initialize(config BootstrapConfig) {
b.BootstrapConfig = config
b.VtxBlocked.SetParser(&vtxParser{
numAccepted: b.numBootstrappedVtx,
numDropped: b.numDroppedVtx,
log: config.Context.Log,
numAccepted: b.numBSVtx,
numDropped: b.numBSDroppedVtx,
state: b.State,
})
b.TxBlocked.SetParser(&txParser{
numAccepted: b.numBootstrappedTx,
numDropped: b.numDroppedTx,
log: config.Context.Log,
numAccepted: b.numBSTx,
numDropped: b.numBSDroppedTx,
vm: b.VM,
})
@ -155,7 +157,7 @@ func (b *bootstrapper) sendRequest(vtxID ids.ID) {
b.pending.Add(vtxID)
b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, vtxID)
b.numPendingRequests.Set(float64(b.pending.Len()))
b.numBSPendingRequests.Set(float64(b.pending.Len()))
}
func (b *bootstrapper) addVertex(vtx avalanche.Vertex) {
@ -182,19 +184,21 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
b.pending.Remove(vtxID)
if err := b.VtxBlocked.Push(&vertexJob{
numAccepted: b.numBootstrappedVtx,
numDropped: b.numDroppedVtx,
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSVtx,
numDropped: b.numBSDroppedVtx,
vtx: vtx,
}); err == nil {
b.numBlockedVtx.Inc()
b.numBSBlockedVtx.Inc()
}
for _, tx := range vtx.Txs() {
if err := b.TxBlocked.Push(&txJob{
numAccepted: b.numBootstrappedVtx,
numDropped: b.numDroppedVtx,
log: b.BootstrapConfig.Context.Log,
numAccepted: b.numBSTx,
numDropped: b.numBSDroppedTx,
tx: tx,
}); err == nil {
b.numBlockedTx.Inc()
b.numBSBlockedTx.Inc()
}
}
@ -207,7 +211,7 @@ func (b *bootstrapper) storeVertex(vtx avalanche.Vertex) {
}
numPending := b.pending.Len()
b.numPendingRequests.Set(float64(numPending))
b.numBSPendingRequests.Set(float64(numPending))
}
func (b *bootstrapper) finish() {
@ -215,8 +219,8 @@ func (b *bootstrapper) finish() {
return
}
b.executeAll(b.TxBlocked, b.numBlockedTx)
b.executeAll(b.VtxBlocked, b.numBlockedVtx)
b.executeAll(b.TxBlocked, b.numBSBlockedTx)
b.executeAll(b.VtxBlocked, b.numBSBlockedVtx)
// Start consensus
b.onFinished()

View File

@ -10,52 +10,52 @@ import (
)
type metrics struct {
numPendingRequests, numBlockedVtx, numBlockedTx prometheus.Gauge
numBootstrappedVtx, numDroppedVtx,
numBootstrappedTx, numDroppedTx prometheus.Counter
numBSPendingRequests, numBSBlockedVtx, numBSBlockedTx prometheus.Gauge
numBSVtx, numBSDroppedVtx,
numBSTx, numBSDroppedTx prometheus.Counter
numPolls, numVtxRequests, numTxRequests, numPendingVtx prometheus.Gauge
}
// Initialize implements the Engine interface
func (m *metrics) Initialize(log logging.Logger, namespace string, registerer prometheus.Registerer) {
m.numPendingRequests = prometheus.NewGauge(
m.numBSPendingRequests = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "av_bs_vtx_requests",
Help: "Number of pending bootstrap vertex requests",
})
m.numBlockedVtx = prometheus.NewGauge(
m.numBSBlockedVtx = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "av_bs_blocked_vts",
Help: "Number of blocked bootstrap vertices",
})
m.numBlockedTx = prometheus.NewGauge(
m.numBSBlockedTx = prometheus.NewGauge(
prometheus.GaugeOpts{
Namespace: namespace,
Name: "av_bs_blocked_txs",
Help: "Number of blocked bootstrap txs",
})
m.numBootstrappedVtx = prometheus.NewCounter(
m.numBSVtx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_accepted_vts",
Help: "Number of accepted vertices",
})
m.numDroppedVtx = prometheus.NewCounter(
m.numBSDroppedVtx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_dropped_vts",
Help: "Number of dropped vertices",
})
m.numBootstrappedTx = prometheus.NewCounter(
m.numBSTx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_accepted_txs",
Help: "Number of accepted txs",
})
m.numDroppedTx = prometheus.NewCounter(
m.numBSDroppedTx = prometheus.NewCounter(
prometheus.CounterOpts{
Namespace: namespace,
Name: "av_bs_dropped_txs",
@ -86,25 +86,25 @@ func (m *metrics) Initialize(log logging.Logger, namespace string, registerer pr
Help: "Number of blocked vertices",
})
if err := registerer.Register(m.numPendingRequests); err != nil {
if err := registerer.Register(m.numBSPendingRequests); err != nil {
log.Error("Failed to register av_bs_vtx_requests statistics due to %s", err)
}
if err := registerer.Register(m.numBlockedVtx); err != nil {
if err := registerer.Register(m.numBSBlockedVtx); err != nil {
log.Error("Failed to register av_bs_blocked_vts statistics due to %s", err)
}
if err := registerer.Register(m.numBlockedTx); err != nil {
if err := registerer.Register(m.numBSBlockedTx); err != nil {
log.Error("Failed to register av_bs_blocked_txs statistics due to %s", err)
}
if err := registerer.Register(m.numBootstrappedVtx); err != nil {
if err := registerer.Register(m.numBSVtx); err != nil {
log.Error("Failed to register av_bs_accepted_vts statistics due to %s", err)
}
if err := registerer.Register(m.numDroppedVtx); err != nil {
if err := registerer.Register(m.numBSDroppedVtx); err != nil {
log.Error("Failed to register av_bs_dropped_vts statistics due to %s", err)
}
if err := registerer.Register(m.numBootstrappedTx); err != nil {
if err := registerer.Register(m.numBSTx); err != nil {
log.Error("Failed to register av_bs_accepted_txs statistics due to %s", err)
}
if err := registerer.Register(m.numDroppedTx); err != nil {
if err := registerer.Register(m.numBSDroppedTx); err != nil {
log.Error("Failed to register av_bs_dropped_txs statistics due to %s", err)
}
if err := registerer.Register(m.numPolls); err != nil {

View File

@ -333,7 +333,7 @@ func (t *Transitive) insert(vtx avalanche.Vertex) {
// Track performance statistics
t.numVtxRequests.Set(float64(t.vtxReqs.Len()))
t.numTxRequests.Set(float64(t.missingTxs.Len()))
t.numBlockedVtx.Set(float64(t.pending.Len()))
t.numPendingVtx.Set(float64(t.pending.Len()))
}
func (t *Transitive) batch(txs []snowstorm.Tx, force, empty bool) {

View File

@ -4,15 +4,19 @@
package avalanche
import (
"encoding/json"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/snowstorm"
"github.com/ava-labs/gecko/snow/engine/common/queue"
"github.com/ava-labs/gecko/utils/logging"
)
type txParser struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
vm DAGVM
}
@ -23,6 +27,7 @@ func (p *txParser) Parse(txBytes []byte) (queue.Job, error) {
return nil, err
}
return &txJob{
log: p.log,
numAccepted: p.numAccepted,
numDropped: p.numDropped,
tx: tx,
@ -30,6 +35,7 @@ func (p *txParser) Parse(txBytes []byte) (queue.Job, error) {
}
type txJob struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
tx snowstorm.Tx
}
@ -44,6 +50,11 @@ func (t *txJob) MissingDependencies() ids.Set {
}
return missing
}
var (
accepted ids.Set
)
func (t *txJob) Execute() {
if t.MissingDependencies().Len() != 0 {
t.numDropped.Inc()
@ -54,7 +65,24 @@ func (t *txJob) Execute() {
case choices.Unknown, choices.Rejected:
t.numDropped.Inc()
case choices.Processing:
t.tx.Verify()
if err := t.tx.Verify(); err != nil {
tx, _ := json.MarshalIndent(t.tx, "", " ")
parents, _ := json.MarshalIndent(t.tx.Dependencies(), "", " ")
t.log.Warn("transaction %s failed verification during bootstrapping due to %s\n"+
"Tx: %s\n"+
"Inputs: %s\n"+
"Dependencies: %s",
t.tx.ID(), err,
tx,
t.tx.InputIDs(),
parents)
}
if accepted.Overlaps(t.tx.InputIDs()) {
t.log.Fatal("Bootstrapping attempted to accept a double spend")
accepted.Union(t.tx.InputIDs())
}
t.tx.Accept()
t.numAccepted.Inc()
}

View File

@ -10,9 +10,11 @@ import (
"github.com/ava-labs/gecko/snow/choices"
"github.com/ava-labs/gecko/snow/consensus/avalanche"
"github.com/ava-labs/gecko/snow/engine/common/queue"
"github.com/ava-labs/gecko/utils/logging"
)
type vtxParser struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
state State
}
@ -23,6 +25,7 @@ func (p *vtxParser) Parse(vtxBytes []byte) (queue.Job, error) {
return nil, err
}
return &vertexJob{
log: p.log,
numAccepted: p.numAccepted,
numDropped: p.numDropped,
vtx: vtx,
@ -30,6 +33,7 @@ func (p *vtxParser) Parse(vtxBytes []byte) (queue.Job, error) {
}
type vertexJob struct {
log logging.Logger
numAccepted, numDropped prometheus.Counter
vtx avalanche.Vertex
}

View File

@ -128,6 +128,12 @@ func (j *Jobs) push(job Job) error {
}
func (j *Jobs) block(job Job, deps ids.Set) error {
if has, err := j.state.HasJob(j.db, job.ID()); err != nil {
return err
} else if has {
return errDuplicate
}
if err := j.state.SetJob(j.db, job); err != nil {
return err
}

View File

@ -5,12 +5,14 @@ package queue
import (
"bytes"
"errors"
"testing"
"github.com/ava-labs/gecko/database/memdb"
"github.com/ava-labs/gecko/ids"
)
// Test that creating a new queue can be created and that it is initially empty.
func TestNew(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -29,6 +31,8 @@ func TestNew(t *testing.T) {
}
}
// Test that a job can be added to a queue, and then the job can be removed from
// the queue after a shutdown.
func TestPushPop(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -94,6 +98,8 @@ func TestPushPop(t *testing.T) {
}
}
// Test that executing a job will cause a dependent job to be placed on to the
// ready queue
func TestExecute(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -116,7 +122,7 @@ func TestExecute(t *testing.T) {
BytesF: func() []byte { return []byte{0} },
}
id1 := ids.Empty.Prefix(0)
id1 := ids.Empty.Prefix(1)
executed1 := new(bool)
job1 := &TestJob{
T: t,
@ -182,7 +188,8 @@ func TestExecute(t *testing.T) {
}
}
func TestDuplicatedPush(t *testing.T) {
// Test that a job that is ready to be executed can only be added once
func TestDuplicatedExecutablePush(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
@ -250,3 +257,115 @@ func TestDuplicatedPush(t *testing.T) {
t.Fatalf("Shouldn't have a container ready to pop")
}
}
// Test that a job that isn't ready to be executed can only be added once
func TestDuplicatedNotExecutablePush(t *testing.T) {
parser := &TestParser{T: t}
db := memdb.New()
jobs, err := New(db)
if err != nil {
t.Fatal(err)
}
jobs.SetParser(parser)
id0 := ids.Empty.Prefix(0)
id1 := ids.Empty.Prefix(1)
job1 := &TestJob{
T: t,
IDF: func() ids.ID { return id1 },
MissingDependenciesF: func() ids.Set {
s := ids.Set{}
s.Add(id0)
return s
},
ExecuteF: func() {},
BytesF: func() []byte { return []byte{1} },
}
job0 := &TestJob{
T: t,
IDF: func() ids.ID { return id0 },
MissingDependenciesF: func() ids.Set { return ids.Set{} },
ExecuteF: func() {
job1.MissingDependenciesF = func() ids.Set { return ids.Set{} }
},
BytesF: func() []byte { return []byte{0} },
}
if err := jobs.Push(job1); err != nil {
t.Fatal(err)
}
if err := jobs.Push(job1); err == nil {
t.Fatalf("should have errored on pushing a duplicate job")
}
if err := jobs.Commit(); err != nil {
t.Fatal(err)
}
jobs, err = New(db)
if err != nil {
t.Fatal(err)
}
jobs.SetParser(parser)
if err := jobs.Push(job0); err != nil {
t.Fatal(err)
}
if hasNext, err := jobs.HasNext(); err != nil {
t.Fatal(err)
} else if !hasNext {
t.Fatalf("Should have a container ready to pop")
}
parser.ParseF = func(b []byte) (Job, error) {
if bytes.Equal(b, []byte{0}) {
return job0, nil
}
if bytes.Equal(b, []byte{1}) {
return job1, nil
}
t.Fatalf("Unknown job")
return nil, errors.New("Unknown job")
}
returnedBlockable, err := jobs.Pop()
if err != nil {
t.Fatal(err)
}
if returnedBlockable != job0 {
t.Fatalf("Returned wrong job")
}
if err := jobs.Execute(job0); err != nil {
t.Fatal(err)
}
if hasNext, err := jobs.HasNext(); err != nil {
t.Fatal(err)
} else if !hasNext {
t.Fatalf("Should have a container ready to pop")
}
returnedBlockable, err = jobs.Pop()
if err != nil {
t.Fatal(err)
}
if returnedBlockable != job1 {
t.Fatalf("Returned wrong job")
}
if hasNext, err := jobs.HasNext(); err != nil {
t.Fatal(err)
} else if hasNext {
t.Fatalf("Shouldn't have a container ready to pop")
}
}

View File

@ -100,13 +100,13 @@ func (tx *UniqueTx) ID() ids.ID { return tx.txID }
// Accept is called when the transaction was finalized as accepted by consensus
func (tx *UniqueTx) Accept() {
defer tx.vm.db.Abort()
if err := tx.setStatus(choices.Accepted); err != nil {
tx.vm.ctx.Log.Error("Failed to accept tx %s due to %s", tx.txID, err)
if s := tx.Status(); s != choices.Processing {
tx.vm.ctx.Log.Error("Failed to accept tx %s because the tx is in state %s", tx.txID, s)
return
}
defer tx.vm.db.Abort()
// Remove spent utxos
for _, utxo := range tx.InputUTXOs() {
if utxo.Symbolic() {
@ -123,20 +123,27 @@ func (tx *UniqueTx) Accept() {
// Add new utxos
for _, utxo := range tx.UTXOs() {
if err := tx.vm.state.FundUTXO(utxo); err != nil {
tx.vm.ctx.Log.Error("Failed to fund utxo %s due to %s", utxoID, err)
tx.vm.ctx.Log.Error("Failed to fund utxo %s due to %s", utxo.InputID(), err)
return
}
}
if err := tx.setStatus(choices.Accepted); err != nil {
tx.vm.ctx.Log.Error("Failed to accept tx %s due to %s", tx.txID, err)
return
}
txID := tx.ID()
commitBatch, err := tx.vm.db.CommitBatch()
if err != nil {
tx.vm.ctx.Log.Error("Failed to calculate CommitBatch for %s due to %s", txID, err)
tx.setStatus(choices.Processing)
return
}
if err := tx.ExecuteWithSideEffects(tx.vm, commitBatch); err != nil {
tx.vm.ctx.Log.Error("Failed to commit accept %s due to %s", txID, err)
tx.setStatus(choices.Processing)
return
}

View File

@ -5,6 +5,7 @@ package ava
import (
"errors"
"fmt"
"github.com/ava-labs/gecko/cache"
"github.com/ava-labs/gecko/database"
@ -146,7 +147,7 @@ func (s *State) SetIDs(id ids.ID, idSlice []ids.ID) error {
bytes, err := s.Codec.Marshal(idSlice)
if err != nil {
return err
return fmt.Errorf("failed to marshal an ID array due to %w", err)
}
s.Cache.Put(id, idSlice)