Merge branch 'master' into ansible-service

This commit is contained in:
Stephen Buttolph 2020-05-12 22:42:33 -04:00 committed by GitHub
commit f737368d65
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
26 changed files with 1622 additions and 181 deletions

6
.editorconfig Normal file
View File

@ -0,0 +1,6 @@
# https://editorconfig.org/
[*]
end_of_line = lf
insert_final_newline = true
trim_trailing_newspace = true

View File

@ -234,7 +234,7 @@ func (s *Voting) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32,
peer, exists := s.conns.GetPeerID(validatorID)
if !exists {
s.log.Debug("attempted to send a Get message to a disconnected validator: %s", validatorID)
s.executor.Add(func() { s.router.GetFailed(validatorID, chainID, requestID, containerID) })
s.executor.Add(func() { s.router.GetFailed(validatorID, chainID, requestID) })
return // Validator is not connected
}

View File

@ -1,4 +1,8 @@
#!/bin/bash -e
#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
# Ted: contact me when you make any changes

View File

@ -1,4 +1,9 @@
#!/bin/bash -e
#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
SRC_DIR="$(dirname "${BASH_SOURCE[0]}")"
export GOPATH="$SRC_DIR/.build_image_gopath"
WORKPREFIX="$GOPATH/src/github.com/ava-labs/"

View File

@ -1,4 +1,8 @@
#!/bin/bash -e
#!/bin/bash
set -o errexit
set -o nounset
set -o pipefail
# Ted: contact me when you make any changes

View File

@ -16,7 +16,7 @@ type snowmanBlock struct {
// block that this node contains. For the genesis, this value will be nil
blk Block
// shouldFalter is set to true if this node, and all its decendants received
// shouldFalter is set to true if this node, and all its descendants received
// less than Alpha votes
shouldFalter bool

View File

@ -149,7 +149,7 @@ func (ts *Topological) Preference() ids.ID { return ts.tail }
// During the sort, votes are pushed towards the genesis. To prevent interating
// over all blocks that had unsuccessful polls, we set a flag on the block to
// know that any future traversal through that block should register an
// unsuccessful poll on that block and every decendant block.
// unsuccessful poll on that block and every descendant block.
//
// The complexity of this function is:
// - Runtime = 3 * |live set| + |votes|
@ -408,7 +408,7 @@ func (ts *Topological) getPreferredDecendent(blkID ids.ID) ids.ID {
// accept the preferred child of the provided snowman block. By accepting the
// preferred child, all other children will be rejected. When these children are
// rejected, all their decendants will be rejected.
// rejected, all their descendants will be rejected.
func (ts *Topological) accept(n *snowmanBlock) {
// We are finalizing the block's child, so we need to get the preference
pref := n.sb.Preference()
@ -451,11 +451,11 @@ func (ts *Topological) accept(n *snowmanBlock) {
rejects = append(rejects, childID)
}
// reject all the decendants of the blocks we just rejected
// reject all the descendants of the blocks we just rejected
ts.rejectTransitively(rejects)
}
// Takes in a list of rejected ids and rejects all decendants of these IDs
// Takes in a list of rejected ids and rejects all descendants of these IDs
func (ts *Topological) rejectTransitively(rejected []ids.ID) {
// the rejected array is treated as a queue, with the next element at index
// 0 and the last element at the end of the slice.

View File

@ -30,6 +30,9 @@ type bootstrapper struct {
metrics
common.Bootstrapper
// vtxReqs prevents asking validators for the same vertex
vtxReqs common.Requests
pending ids.Set
finished bool
onFinished func()
@ -90,16 +93,22 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) {
func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) {
b.BootstrapConfig.Context.Log.Verbo("Put called for vertexID %s", vtxID)
if !b.pending.Contains(vtxID) {
return
}
vtx, err := b.State.ParseVertex(vtxBytes)
if err != nil {
b.BootstrapConfig.Context.Log.Debug("ParseVertex failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: vtxBytes})
b.GetFailed(vdr, requestID, vtxID)
b.GetFailed(vdr, requestID)
return
}
if !b.pending.Contains(vtx.ID()) {
b.BootstrapConfig.Context.Log.Debug("Validator %s sent an unrequested vertex:\n%s",
vdr,
formatting.DumpBytes{Bytes: vtxBytes})
b.GetFailed(vdr, requestID)
return
}
@ -107,7 +116,16 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxB
}
// GetFailed ...
func (b *bootstrapper) GetFailed(_ ids.ShortID, _ uint32, vtxID ids.ID) { b.sendRequest(vtxID) }
func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) {
vtxID, ok := b.vtxReqs.Remove(vdr, requestID)
if !ok {
b.BootstrapConfig.Context.Log.Debug("GetFailed called without sending the corresponding Get message from %s",
vdr)
return
}
b.sendRequest(vtxID)
}
func (b *bootstrapper) fetch(vtxID ids.ID) {
if b.pending.Contains(vtxID) {
@ -131,6 +149,9 @@ func (b *bootstrapper) sendRequest(vtxID ids.ID) {
validatorID := validators[0].ID()
b.RequestID++
b.vtxReqs.RemoveAny(vtxID)
b.vtxReqs.Add(validatorID, b.RequestID, vtxID)
b.pending.Add(vtxID)
b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, vtxID)

View File

@ -289,7 +289,6 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) {
bs.ForceAccepted(acceptedIDs)
state.getVertex = nil
sender.GetF = nil
state.parseVertex = func(vtxBytes []byte) (avalanche.Vertex, error) {
switch {
@ -1008,3 +1007,111 @@ func TestBootstrapperPartialFetch(t *testing.T) {
t.Fatalf("wrong number pending")
}
}
func TestBootstrapperWrongIDByzantineResponse(t *testing.T) {
config, peerID, sender, state, _ := newConfig(t)
vtxID0 := ids.Empty.Prefix(0)
vtxID1 := ids.Empty.Prefix(1)
vtxBytes0 := []byte{0}
vtxBytes1 := []byte{1}
vtx0 := &Vtx{
id: vtxID0,
height: 0,
status: choices.Processing,
bytes: vtxBytes0,
}
vtx1 := &Vtx{
id: vtxID1,
height: 0,
status: choices.Processing,
bytes: vtxBytes1,
}
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
acceptedIDs := ids.Set{}
acceptedIDs.Add(
vtxID0,
)
state.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
switch {
case vtxID.Equals(vtxID0):
return nil, errUnknownVertex
default:
t.Fatal(errUnknownVertex)
panic(errUnknownVertex)
}
}
requestID := new(uint32)
sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested vertex from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(vtxID0):
default:
t.Fatalf("Requested unknown vertex")
}
*requestID = reqID
}
bs.ForceAccepted(acceptedIDs)
state.getVertex = nil
sender.GetF = nil
state.parseVertex = func(vtxBytes []byte) (avalanche.Vertex, error) {
switch {
case bytes.Equal(vtxBytes, vtxBytes0):
return vtx0, nil
case bytes.Equal(vtxBytes, vtxBytes1):
return vtx1, nil
}
t.Fatal(errParsedUnknownVertex)
return nil, errParsedUnknownVertex
}
state.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
switch {
case vtxID.Equals(vtxID0):
return vtx0, nil
case vtxID.Equals(vtxID1):
return vtx1, nil
default:
t.Fatal(errUnknownVertex)
panic(errUnknownVertex)
}
}
finished := new(bool)
bs.onFinished = func() { *finished = true }
sender.CantGet = false
bs.Put(peerID, *requestID, vtxID0, vtxBytes1)
sender.CantGet = true
bs.Put(peerID, *requestID, vtxID0, vtxBytes0)
state.parseVertex = nil
state.edge = nil
bs.onFinished = nil
if !*finished {
t.Fatalf("Bootstrapping should have finished")
}
if vtx0.Status() != choices.Accepted {
t.Fatalf("Vertex should be accepted")
}
if vtx1.Status() != choices.Processing {
t.Fatalf("Vertex should be processing")
}
}

View File

@ -76,10 +76,8 @@ func (i *issuer) Update() {
}
i.t.RequestID++
polled := false
if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet.Len()) {
i.t.Config.Sender.PushQuery(vdrSet, i.t.RequestID, vtxID, i.vtx.Bytes())
polled = true
} else if numVdrs < p.K {
i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID)
}
@ -89,9 +87,7 @@ func (i *issuer) Update() {
i.t.txBlocked.Fulfill(tx.ID())
}
if polled && len(i.t.polls.m) < i.t.Params.ConcurrentRepolls {
i.t.repoll()
}
i.t.repoll()
}
type vtxIssuer struct{ i *issuer }

View File

@ -23,8 +23,10 @@ type Transitive struct {
polls polls // track people I have asked for their preference
// vtxReqs prevents asking validators for the same vertex
vtxReqs common.Requests
// missingTxs tracks transaction that are missing
vtxReqs, missingTxs, pending ids.Set
missingTxs, pending ids.Set
// vtxBlocked tracks operations that are blocked on vertices
// txBlocked tracks operations that are blocked on transactions
@ -115,22 +117,27 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxByt
t.Config.Context.Log.Debug("ParseVertex failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: vtxBytes})
t.GetFailed(vdr, requestID, vtxID)
t.GetFailed(vdr, requestID)
return
}
t.insertFrom(vdr, vtx)
}
// GetFailed implements the Engine interface
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, vtxID ids.ID) {
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) {
if !t.bootstrapped {
t.bootstrapper.GetFailed(vdr, requestID, vtxID)
t.bootstrapper.GetFailed(vdr, requestID)
return
}
vtxID, ok := t.vtxReqs.Remove(vdr, requestID)
if !ok {
t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s",
vdr)
return
}
t.pending.Remove(vtxID)
t.vtxBlocked.Abandon(vtxID)
t.vtxReqs.Remove(vtxID)
if t.vtxReqs.Len() == 0 {
for _, txID := range t.missingTxs.List() {
@ -142,7 +149,6 @@ func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, vtxID ids.ID)
// Track performance statistics
t.numVtxRequests.Set(float64(t.vtxReqs.Len()))
t.numTxRequests.Set(float64(t.missingTxs.Len()))
t.numBlockedVtx.Set(float64(t.pending.Len()))
}
// PullQuery implements the Engine interface
@ -167,14 +173,22 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID)
}
// PushQuery implements the Engine interface
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtx []byte) {
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) {
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", vtxID)
return
}
t.Put(vdr, requestID, vtxID, vtx)
t.PullQuery(vdr, requestID, vtxID)
vtx, err := t.Config.State.ParseVertex(vtxBytes)
if err != nil {
t.Config.Context.Log.Warn("ParseVertex failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: vtxBytes})
return
}
t.insertFrom(vdr, vtx)
t.PullQuery(vdr, requestID, vtx.ID())
}
// Chits implements the Engine interface
@ -220,8 +234,16 @@ func (t *Transitive) Notify(msg common.Message) {
}
func (t *Transitive) repoll() {
if len(t.polls.m) >= t.Params.ConcurrentRepolls {
return
}
txs := t.Config.VM.PendingTxs()
t.batch(txs, false /*=force*/, true /*=empty*/)
for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ {
t.batch(nil, false /*=force*/, true /*=empty*/)
}
}
func (t *Transitive) reinsertFrom(vdr ids.ShortID, vtxID ids.ID) bool {
@ -266,7 +288,7 @@ func (t *Transitive) insert(vtx avalanche.Vertex) {
vtxID := vtx.ID()
t.pending.Add(vtxID)
t.vtxReqs.Remove(vtxID)
t.vtxReqs.RemoveAny(vtxID)
i := &issuer{
t: t,
@ -395,10 +417,10 @@ func (t *Transitive) sendRequest(vdr ids.ShortID, vtxID ids.ID) {
return
}
t.vtxReqs.Add(vtxID)
t.RequestID++
t.vtxReqs.Add(vdr, t.RequestID, vtxID)
t.Config.Sender.Get(vdr, t.RequestID, vtxID)
t.numVtxRequests.Set(float64(t.vtxReqs.Len())) // Tracks performance statistics
t.RequestID++
t.Config.Sender.Get(vdr, t.RequestID, vtxID)
}

View File

@ -40,6 +40,7 @@ func TestEngineShutdown(t *testing.T) {
t.Fatal("Shutting down the Transitive did not shutdown the VM")
}
}
func TestEngineAdd(t *testing.T) {
config := DefaultConfig()
@ -85,7 +86,9 @@ func TestEngineAdd(t *testing.T) {
}
asked := new(bool)
sender.GetF = func(inVdr ids.ShortID, _ uint32, vtxID ids.ID) {
reqID := new(uint32)
sender.GetF = func(inVdr ids.ShortID, requestID uint32, vtxID ids.ID) {
*reqID = requestID
if *asked {
t.Fatalf("Asked multiple times")
}
@ -119,7 +122,7 @@ func TestEngineAdd(t *testing.T) {
st.parseVertex = func(b []byte) (avalanche.Vertex, error) { return nil, errFailedParsing }
te.Put(vdr.ID(), 0, vtx.parents[0].ID(), nil)
te.Put(vdr.ID(), *reqID, vtx.parents[0].ID(), nil)
st.parseVertex = nil
@ -485,7 +488,9 @@ func TestEngineMultipleQuery(t *testing.T) {
}
asked := new(bool)
sender.GetF = func(inVdr ids.ShortID, _ uint32, vtxID ids.ID) {
reqID := new(uint32)
sender.GetF = func(inVdr ids.ShortID, requestID uint32, vtxID ids.ID) {
*reqID = requestID
if *asked {
t.Fatalf("Asked multiple times")
}
@ -512,7 +517,7 @@ func TestEngineMultipleQuery(t *testing.T) {
// Should be dropped because the query was marked as failed
te.Chits(vdr1.ID(), *queryRequestID, s0)
te.GetFailed(vdr0.ID(), 0, vtx1.ID())
te.GetFailed(vdr0.ID(), *reqID)
if vtx0.Status() != choices.Accepted {
t.Fatalf("Should have executed vertex")
@ -598,6 +603,12 @@ func TestEngineAbandonResponse(t *testing.T) {
st := &stateTest{t: t}
config.State = st
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
@ -629,8 +640,13 @@ func TestEngineAbandonResponse(t *testing.T) {
te.Initialize(config)
te.finishBootstrapping()
reqID := new(uint32)
sender.GetF = func(vID ids.ShortID, requestID uint32, vtxID ids.ID) {
*reqID = requestID
}
te.PullQuery(vdr.ID(), 0, vtx.ID())
te.GetFailed(vdr.ID(), 0, vtx.ID())
te.GetFailed(vdr.ID(), *reqID)
if len(te.vtxBlocked) != 0 {
t.Fatalf("Should have removed blocking event")
@ -2098,7 +2114,7 @@ func TestEngineReissueAbortedVertex(t *testing.T) {
sender.GetF = nil
st.parseVertex = nil
te.GetFailed(vdrID, *requestID, vtxID0)
te.GetFailed(vdrID, *requestID)
requested := new(bool)
sender.GetF = func(_ ids.ShortID, _ uint32, vtxID ids.ID) {
@ -2587,3 +2603,375 @@ func TestEngineGossip(t *testing.T) {
t.Fatalf("Should have gossiped the vertex")
}
}
func TestEngineInvalidVertexIgnoredFromUnexpectedPeer(t *testing.T) {
config := DefaultConfig()
vdr := validators.GenerateRandomValidator(1)
secondVdr := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
config.Validators = vals
vals.Add(vdr)
vals.Add(secondVdr)
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
st := &stateTest{t: t}
config.State = st
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
bytes: []byte{0},
}
vts := []avalanche.Vertex{gVtx}
utxos := []ids.ID{GenerateID(), GenerateID()}
tx0 := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx0.Ins.Add(utxos[0])
tx1 := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx1.Ins.Add(utxos[1])
vtx0 := &Vtx{
parents: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx0},
height: 1,
status: choices.Unknown,
bytes: []byte{1},
}
vtx1 := &Vtx{
parents: []avalanche.Vertex{vtx0},
id: GenerateID(),
txs: []snowstorm.Tx{tx1},
height: 2,
status: choices.Processing,
bytes: []byte{2},
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
parsed := new(bool)
st.parseVertex = func(b []byte) (avalanche.Vertex, error) {
switch {
case bytes.Equal(b, vtx1.Bytes()):
*parsed = true
return vtx1, nil
}
return nil, errUnknownVertex
}
st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
if !*parsed {
return nil, errUnknownVertex
}
switch {
case vtxID.Equals(vtx1.ID()):
return vtx1, nil
}
return nil, errUnknownVertex
}
reqID := new(uint32)
sender.GetF = func(reqVdr ids.ShortID, requestID uint32, vtxID ids.ID) {
*reqID = requestID
if !reqVdr.Equals(vdr.ID()) {
t.Fatalf("Wrong validator requested")
}
if !vtxID.Equals(vtx0.ID()) {
t.Fatalf("Wrong vertex requested")
}
}
te.PushQuery(vdr.ID(), 0, vtx1.ID(), vtx1.Bytes())
te.Put(secondVdr.ID(), *reqID, vtx0.ID(), []byte{3})
*parsed = false
st.parseVertex = func(b []byte) (avalanche.Vertex, error) {
switch {
case bytes.Equal(b, vtx0.Bytes()):
*parsed = true
return vtx0, nil
}
return nil, errUnknownVertex
}
st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
if !*parsed {
return nil, errUnknownVertex
}
switch {
case vtxID.Equals(vtx0.ID()):
return vtx0, nil
}
return nil, errUnknownVertex
}
sender.CantPushQuery = false
sender.CantChits = false
vtx0.status = choices.Processing
te.Put(vdr.ID(), *reqID, vtx0.ID(), vtx0.Bytes())
prefs := te.Consensus.Preferences()
if !prefs.Contains(vtx1.ID()) {
t.Fatalf("Shouldn't have abandoned the pending vertex")
}
}
func TestEnginePushQueryRequestIDConflict(t *testing.T) {
config := DefaultConfig()
vdr := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
config.Validators = vals
vals.Add(vdr)
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
st := &stateTest{t: t}
config.State = st
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
bytes: []byte{0},
}
vts := []avalanche.Vertex{gVtx}
utxos := []ids.ID{GenerateID(), GenerateID()}
tx0 := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx0.Ins.Add(utxos[0])
tx1 := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx1.Ins.Add(utxos[1])
vtx0 := &Vtx{
parents: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx0},
height: 1,
status: choices.Unknown,
bytes: []byte{1},
}
vtx1 := &Vtx{
parents: []avalanche.Vertex{vtx0},
id: GenerateID(),
txs: []snowstorm.Tx{tx1},
height: 2,
status: choices.Processing,
bytes: []byte{2},
}
randomVtxID := GenerateID()
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
parsed := new(bool)
st.parseVertex = func(b []byte) (avalanche.Vertex, error) {
switch {
case bytes.Equal(b, vtx1.Bytes()):
*parsed = true
return vtx1, nil
}
return nil, errUnknownVertex
}
st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
if !*parsed {
return nil, errUnknownVertex
}
switch {
case vtxID.Equals(vtx1.ID()):
return vtx1, nil
}
return nil, errUnknownVertex
}
reqID := new(uint32)
sender.GetF = func(reqVdr ids.ShortID, requestID uint32, vtxID ids.ID) {
*reqID = requestID
if !reqVdr.Equals(vdr.ID()) {
t.Fatalf("Wrong validator requested")
}
if !vtxID.Equals(vtx0.ID()) {
t.Fatalf("Wrong vertex requested")
}
}
te.PushQuery(vdr.ID(), 0, vtx1.ID(), vtx1.Bytes())
sender.GetF = nil
sender.CantGet = false
te.PushQuery(vdr.ID(), *reqID, randomVtxID, []byte{3})
*parsed = false
st.parseVertex = func(b []byte) (avalanche.Vertex, error) {
switch {
case bytes.Equal(b, vtx0.Bytes()):
*parsed = true
return vtx0, nil
}
return nil, errUnknownVertex
}
st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
if !*parsed {
return nil, errUnknownVertex
}
switch {
case vtxID.Equals(vtx0.ID()):
return vtx0, nil
}
return nil, errUnknownVertex
}
sender.CantPushQuery = false
sender.CantChits = false
vtx0.status = choices.Processing
te.Put(vdr.ID(), *reqID, vtx0.ID(), vtx0.Bytes())
prefs := te.Consensus.Preferences()
if !prefs.Contains(vtx1.ID()) {
t.Fatalf("Shouldn't have abandoned the pending vertex")
}
}
func TestEngineAggressivePolling(t *testing.T) {
config := DefaultConfig()
config.Params.ConcurrentRepolls = 3
vdr := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
config.Validators = vals
vals.Add(vdr)
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
st := &stateTest{t: t}
config.State = st
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
bytes: []byte{0},
}
vts := []avalanche.Vertex{gVtx}
utxos := []ids.ID{GenerateID(), GenerateID()}
tx0 := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx0.Ins.Add(utxos[0])
tx1 := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx1.Ins.Add(utxos[1])
vtx := &Vtx{
parents: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx0},
height: 1,
status: choices.Processing,
bytes: []byte{1},
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
parsed := new(bool)
st.parseVertex = func(b []byte) (avalanche.Vertex, error) {
switch {
case bytes.Equal(b, vtx.Bytes()):
*parsed = true
return vtx, nil
}
return nil, errUnknownVertex
}
st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
if !*parsed {
return nil, errUnknownVertex
}
switch {
case vtxID.Equals(vtx.ID()):
return vtx, nil
}
return nil, errUnknownVertex
}
numPushQueries := new(int)
sender.PushQueryF = func(ids.ShortSet, uint32, ids.ID, []byte) { *numPushQueries++ }
numPullQueries := new(int)
sender.PullQueryF = func(ids.ShortSet, uint32, ids.ID) { *numPullQueries++ }
te.Put(vdr.ID(), 0, vtx.ID(), vtx.Bytes())
if *numPushQueries != 1 {
t.Fatalf("should have issued one push query")
}
if *numPullQueries != 2 {
t.Fatalf("should have issued one pull query")
}
}

View File

@ -59,10 +59,7 @@ func (v *voter) Update() {
}
v.t.Config.Context.Log.Verbo("Avalanche engine can't quiesce")
if len(v.t.polls.m) < v.t.Config.Params.ConcurrentRepolls {
v.t.repoll()
}
v.t.repoll()
}
func (v *voter) bubbleVotes(votes ids.UniqueBag) ids.UniqueBag {

View File

@ -34,75 +34,181 @@ type ExternalHandler interface {
// FrontierHandler defines how a consensus engine reacts to frontier messages
// from other validators
type FrontierHandler interface {
// GetAcceptedFrontier notifies this consensus engine that its accepted
// frontier is requested by the specified validator
// Notify this engine of a request for the accepted frontier of vertices.
//
// The accepted frontier is the set of accepted vertices that do not have
// any accepted descendants.
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID. However, the validatorID is
// assumed to be authenticated.
//
// This engine should respond with an AcceptedFrontier message with the same
// requestID, and the engine's current accepted frontier.
GetAcceptedFrontier(validatorID ids.ShortID, requestID uint32)
// AcceptedFrontier notifies this consensus engine of the specified
// validators current accepted frontier
AcceptedFrontier(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set)
// Notify this engine of an accepted frontier.
//
// This function can be called by any validator. It is not safe to assume
// this message is in response to a GetAcceptedFrontier message, is utilizing a
// unique requestID, or that the containerIDs from a valid frontier.
// However, the validatorID is assumed to be authenticated.
AcceptedFrontier(
validatorID ids.ShortID,
requestID uint32,
containerIDs ids.Set,
)
// GetAcceptedFrontierFailed notifies this consensus engine that the
// requested accepted frontier from the specified validator should be
// considered lost
// Notify this engine that a get accepted frontier request it issued has
// failed.
//
// This function will be called if the engine sent a GetAcceptedFrontier
// message that is not anticipated to be responded to. This could be because
// the recipient of the message is unknown or if the message request has
// timed out.
//
// The validatorID, and requestID, are assumed to be the same as those sent
// in the GetAcceptedFrontier message.
GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32)
}
// AcceptedHandler defines how a consensus engine reacts to messages pertaining
// to accepted containers from other validators
type AcceptedHandler interface {
// GetAccepted notifies this consensus engine that it should send the set of
// containerIDs that it has accepted from the provided set to the specified
// validator
// Notify this engine of a request to filter non-accepted vertices.
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID. However, the validatorID is
// assumed to be authenticated.
//
// This engine should respond with an Accepted message with the same
// requestID, and the subset of the containerIDs that this node has decided
// are accepted.
GetAccepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set)
// Accepted notifies this consensus engine of a set of accepted containerIDs
// Notify this engine of a set of accepted vertices.
//
// This function can be called by any validator. It is not safe to assume
// this message is in response to a GetAccepted message, is utilizing a
// unique requestID, or that the containerIDs are a subset of the
// containerIDs from a GetAccepted message. However, the validatorID is
// assumed to be authenticated.
Accepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set)
// GetAcceptedFailed notifies this consensus engine that the requested
// accepted containers requested from the specified validator should be
// considered lost
// Notify this engine that a get accepted request it issued has failed.
//
// This function will be called if the engine sent a GetAccepted message
// that is not anticipated to be responded to. This could be because the
// recipient of the message is unknown or if the message request has timed
// out.
//
// The validatorID, and requestID, are assumed to be the same as those sent
// in the GetAccepted message.
GetAcceptedFailed(validatorID ids.ShortID, requestID uint32)
}
// FetchHandler defines how a consensus engine reacts to retrieval messages from
// other validators
type FetchHandler interface {
// Get notifies this consensus engine that the specified validator requested
// that this engine send the specified container to it
// Notify this engine of a request for a container.
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID. It is also not safe to
// assume the requested containerID exists. However, the validatorID is
// assumed to be authenticated.
//
// There should never be a situation where a virtuous node sends a Get
// request to another virtuous node that does not have the requested
// container. Unless that container was pruned from the active set.
//
// This engine should respond with a Put message with the same requestID if
// the container was locally avaliable. Otherwise, the message can be safely
// dropped.
Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
// Put the container with the specified ID and body.
// Notify this engine of a container.
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID or even that the containerID
// matches the ID of the container bytes. However, the validatorID is
// assumed to be authenticated.
//
// This engine needs to request and receive missing ancestors of the
// container before adding the container to consensus. Once all ancestor
// containers are added, pushes the container into the consensus.
Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte)
Put(
validatorID ids.ShortID,
requestID uint32,
containerID ids.ID,
container []byte,
)
// Notify this engine that a get request it issued has failed.
GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
//
// This function will be called if the engine sent a Get message that is not
// anticipated to be responded to. This could be because the recipient of
// the message is unknown or if the message request has timed out.
//
// The validatorID and requestID are assumed to be the same as those sent in
// the Get message.
GetFailed(validatorID ids.ShortID, requestID uint32)
}
// QueryHandler defines how a consensus engine reacts to query messages from
// other validators
type QueryHandler interface {
// Notify this engine that the specified validator queried it about the
// specified container. That is, the validator would like to know whether
// this engine prefers the specified container. If the ancestry of the
// container is incomplete, or the container is unknown, request the missing
// data. Once complete, sends this validator the current preferences.
// Notify this engine of a request for our preferences.
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID. However, the validatorID is
// assumed to be authenticated.
//
// If the container or its ancestry is incomplete, this engine is expected
// to request the missing containers from the validator. Once the ancestry
// is complete, this engine should send this validator the current
// preferences in a Chits message. The Chits message should have the same
// requestID that was passed in here.
PullQuery(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
// Notify this engine that the specified validator queried it about the
// specified container. That is, the validator would like to know whether
// this engine prefers the specified container. If the ancestry of the
// container is incomplete, request it. Once complete, sends this validator
// the current preferences.
PushQuery(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte)
// Notify this engine of a request for our preferences.
//
// This function can be called by any validator. It is not safe to assume
// this message is utilizing a unique requestID or even that the containerID
// matches the ID of the container bytes. However, the validatorID is
// assumed to be authenticated.
//
// This function is meant to behave the same way as PullQuery, except the
// container is optimistically provided to potentially remove the need for
// a series of Get/Put messages.
//
// If the ancestry of the container is incomplete, this engine is expected
// to request the ancestry from the validator. Once the ancestry is
// complete, this engine should send this validator the current preferences
// in a Chits message. The Chits message should have the same requestID that
// was passed in here.
PushQuery(
validatorID ids.ShortID,
requestID uint32,
containerID ids.ID,
container []byte,
)
// Notify this engine of the specified validators preferences.
//
// This function can be called by any validator. It is not safe to assume
// this message is in response to a PullQuery or a PushQuery message.
// However, the validatorID is assumed to be authenticated.
Chits(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set)
// Notify this engine that a query it issued has failed.
//
// This function will be called if the engine sent a PullQuery or PushQuery
// message that is not anticipated to be responded to. This could be because
// the recipient of the message is unknown or if the message request has
// timed out.
//
// The validatorID and the requestID are assumed to be the same as those
// sent in the Query message.
QueryFailed(validatorID ids.ShortID, requestID uint32)
}
@ -110,14 +216,19 @@ type QueryHandler interface {
// other components of this validator
type InternalHandler interface {
// Startup this engine.
//
// This function will be called once the environment is configured to be
// able to run the engine.
Startup()
// Gossip to the network a container on the accepted frontier
Gossip()
// Shutdown this engine.
//
// This function will be called when the environment is exiting.
Shutdown()
// Notify this engine that the vm has sent a message to it.
// Notify this engine of a message from the virtual machine.
Notify(Message)
}

View File

@ -0,0 +1,88 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package common
import (
"github.com/ava-labs/gecko/ids"
)
type req struct {
vdr ids.ShortID
id uint32
}
// Requests tracks pending container messages from a peer.
type Requests struct {
reqsToID map[[20]byte]map[uint32]ids.ID
idToReq map[[32]byte]req
}
// Add a request. Assumes that requestIDs are unique. Assumes that containerIDs
// are only in one request at a time.
func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) {
if r.reqsToID == nil {
r.reqsToID = make(map[[20]byte]map[uint32]ids.ID)
}
vdrKey := vdr.Key()
vdrReqs, ok := r.reqsToID[vdrKey]
if !ok {
vdrReqs = make(map[uint32]ids.ID)
r.reqsToID[vdrKey] = vdrReqs
}
vdrReqs[requestID] = containerID
if r.idToReq == nil {
r.idToReq = make(map[[32]byte]req)
}
r.idToReq[containerID.Key()] = req{
vdr: vdr,
id: requestID,
}
}
// Remove attempts to abandon a requestID sent to a validator. If the request is
// currently outstanding, the requested ID will be returned along with true. If
// the request isn't currently outstanding, false will be returned.
func (r *Requests) Remove(vdr ids.ShortID, requestID uint32) (ids.ID, bool) {
vdrKey := vdr.Key()
vdrReqs, ok := r.reqsToID[vdrKey]
if !ok {
return ids.ID{}, false
}
containerID, ok := vdrReqs[requestID]
if !ok {
return ids.ID{}, false
}
if len(vdrReqs) == 1 {
delete(r.reqsToID, vdrKey)
} else {
delete(vdrReqs, requestID)
}
delete(r.idToReq, containerID.Key())
return containerID, true
}
// RemoveAny outstanding requests for the container ID. True is returned if the
// container ID had an outstanding request.
func (r *Requests) RemoveAny(containerID ids.ID) bool {
req, ok := r.idToReq[containerID.Key()]
if !ok {
return false
}
r.Remove(req.vdr, req.id)
return true
}
// Len returns the total number of outstanding requests.
func (r *Requests) Len() int { return len(r.idToReq) }
// Contains returns true if there is an outstanding request for the container
// ID.
func (r *Requests) Contains(containerID ids.ID) bool {
_, ok := r.idToReq[containerID.Key()]
return ok
}

View File

@ -0,0 +1,90 @@
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package common
import (
"testing"
"github.com/stretchr/testify/assert"
"github.com/ava-labs/gecko/ids"
)
func TestRequests(t *testing.T) {
req := Requests{}
length := req.Len()
assert.Equal(t, 0, length, "should have had no outstanding requests")
_, removed := req.Remove(ids.ShortEmpty, 0)
assert.False(t, removed, "shouldn't have removed the request")
removed = req.RemoveAny(ids.Empty)
assert.False(t, removed, "shouldn't have removed the request")
constains := req.Contains(ids.Empty)
assert.False(t, constains, "shouldn't contain this request")
req.Add(ids.ShortEmpty, 0, ids.Empty)
length = req.Len()
assert.Equal(t, 1, length, "should have had one outstanding request")
_, removed = req.Remove(ids.ShortEmpty, 1)
assert.False(t, removed, "shouldn't have removed the request")
_, removed = req.Remove(ids.NewShortID([20]byte{1}), 0)
assert.False(t, removed, "shouldn't have removed the request")
constains = req.Contains(ids.Empty)
assert.True(t, constains, "should contain this request")
length = req.Len()
assert.Equal(t, 1, length, "should have had one outstanding request")
req.Add(ids.ShortEmpty, 10, ids.Empty.Prefix(0))
length = req.Len()
assert.Equal(t, 2, length, "should have had two outstanding requests")
_, removed = req.Remove(ids.ShortEmpty, 1)
assert.False(t, removed, "shouldn't have removed the request")
_, removed = req.Remove(ids.NewShortID([20]byte{1}), 0)
assert.False(t, removed, "shouldn't have removed the request")
constains = req.Contains(ids.Empty)
assert.True(t, constains, "should contain this request")
length = req.Len()
assert.Equal(t, 2, length, "should have had two outstanding requests")
removedID, removed := req.Remove(ids.ShortEmpty, 0)
assert.True(t, removedID.Equals(ids.Empty), "should have removed the requested ID")
assert.True(t, removed, "should have removed the request")
removedID, removed = req.Remove(ids.ShortEmpty, 10)
assert.True(t, removedID.Equals(ids.Empty.Prefix(0)), "should have removed the requested ID")
assert.True(t, removed, "should have removed the request")
length = req.Len()
assert.Equal(t, 0, length, "should have had no outstanding requests")
req.Add(ids.ShortEmpty, 0, ids.Empty)
length = req.Len()
assert.Equal(t, 1, length, "should have had one outstanding request")
removed = req.RemoveAny(ids.Empty)
assert.True(t, removed, "should have removed the request")
length = req.Len()
assert.Equal(t, 0, length, "should have had no outstanding requests")
removed = req.RemoveAny(ids.Empty)
assert.False(t, removed, "shouldn't have removed the request")
length = req.Len()
assert.Equal(t, 0, length, "should have had no outstanding requests")
}

View File

@ -42,7 +42,8 @@ type EngineTest struct {
StartupF, GossipF, ShutdownF func()
ContextF func() *snow.Context
NotifyF func(Message)
GetF, GetFailedF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
GetF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
GetFailedF func(validatorID ids.ShortID, requestID uint32)
PutF, PushQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte)
GetAcceptedFrontierF, GetAcceptedFrontierFailedF, GetAcceptedFailedF, QueryFailedF func(validatorID ids.ShortID, requestID uint32)
AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set)
@ -187,9 +188,9 @@ func (e *EngineTest) Get(validatorID ids.ShortID, requestID uint32, containerID
}
// GetFailed ...
func (e *EngineTest) GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
func (e *EngineTest) GetFailed(validatorID ids.ShortID, requestID uint32) {
if e.GetFailedF != nil {
e.GetFailedF(validatorID, requestID, containerID)
e.GetFailedF(validatorID, requestID)
} else if e.CantGetFailed && e.T != nil {
e.T.Fatalf("Unexpectedly called GetFailed")
}

View File

@ -20,6 +20,9 @@ type BootstrapConfig struct {
// Blocked tracks operations that are blocked on blocks
Blocked *queue.Jobs
// blocks that have outstanding get requests
blkReqs common.Requests
VM ChainVM
Bootstrapped func()
@ -84,16 +87,22 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) {
func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) {
b.BootstrapConfig.Context.Log.Verbo("Put called for blkID %s", blkID)
if !b.pending.Contains(blkID) {
return
}
blk, err := b.VM.ParseBlock(blkBytes)
if err != nil {
b.BootstrapConfig.Context.Log.Debug("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
b.GetFailed(vdr, requestID, blkID)
b.GetFailed(vdr, requestID)
return
}
if !b.pending.Contains(blk.ID()) {
b.BootstrapConfig.Context.Log.Debug("Validator %s sent an unrequested block:\n%s",
vdr,
formatting.DumpBytes{Bytes: blkBytes})
b.GetFailed(vdr, requestID)
return
}
@ -101,7 +110,15 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkB
}
// GetFailed ...
func (b *bootstrapper) GetFailed(_ ids.ShortID, _ uint32, blkID ids.ID) { b.sendRequest(blkID) }
func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) {
blkID, ok := b.blkReqs.Remove(vdr, requestID)
if !ok {
b.BootstrapConfig.Context.Log.Debug("GetFailed called without sending the corresponding Get message from %s",
vdr)
return
}
b.sendRequest(blkID)
}
func (b *bootstrapper) fetch(blkID ids.ID) {
if b.pending.Contains(blkID) {
@ -125,6 +142,9 @@ func (b *bootstrapper) sendRequest(blkID ids.ID) {
validatorID := validators[0].ID()
b.RequestID++
b.blkReqs.RemoveAny(blkID)
b.blkReqs.Add(validatorID, b.RequestID, blkID)
b.pending.Add(blkID)
b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, blkID)

View File

@ -223,12 +223,13 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) {
bs.ForceAccepted(acceptedIDs)
vm.GetBlockF = nil
sender.GetF = nil
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes1):
return blk1, nil
case bytes.Equal(blkBytes, blkBytes2):
return blk2, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
@ -477,3 +478,113 @@ func TestBootstrapperPartialFetch(t *testing.T) {
t.Fatalf("wrong number pending")
}
}
func TestBootstrapperWrongIDByzantineResponse(t *testing.T) {
config, peerID, sender, vm := newConfig(t)
blkID0 := ids.Empty.Prefix(0)
blkID1 := ids.Empty.Prefix(1)
blkID2 := ids.Empty.Prefix(2)
blkBytes0 := []byte{0}
blkBytes1 := []byte{1}
blkBytes2 := []byte{2}
blk0 := &Blk{
id: blkID0,
height: 0,
status: choices.Accepted,
bytes: blkBytes0,
}
blk1 := &Blk{
parent: blk0,
id: blkID1,
height: 1,
status: choices.Processing,
bytes: blkBytes1,
}
blk2 := &Blk{
parent: blk1,
id: blkID2,
height: 2,
status: choices.Processing,
bytes: blkBytes2,
}
bs := bootstrapper{}
bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry())
bs.Initialize(config)
acceptedIDs := ids.Set{}
acceptedIDs.Add(blkID1)
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(blkID1):
return nil, errUnknownBlock
default:
t.Fatal(errUnknownBlock)
panic(errUnknownBlock)
}
}
requestID := new(uint32)
sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) {
if !vdr.Equals(peerID) {
t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr)
}
switch {
case vtxID.Equals(blkID1):
default:
t.Fatalf("Requested unknown block")
}
*requestID = reqID
}
bs.ForceAccepted(acceptedIDs)
vm.GetBlockF = nil
sender.GetF = nil
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes2):
return blk2, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
sender.CantGet = false
bs.Put(peerID, *requestID, blkID1, blkBytes2)
sender.CantGet = true
vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) {
switch {
case bytes.Equal(blkBytes, blkBytes1):
return blk1, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
finished := new(bool)
bs.onFinished = func() { *finished = true }
bs.Put(peerID, *requestID, blkID1, blkBytes1)
vm.ParseBlockF = nil
if !*finished {
t.Fatalf("Bootstrapping should have finished")
}
if blk1.Status() != choices.Accepted {
t.Fatalf("Block should be accepted")
}
if blk2.Status() != choices.Processing {
t.Fatalf("Block should be processing")
}
}

View File

@ -19,12 +19,22 @@ type Transitive struct {
Config
bootstrapper
polls polls // track people I have asked for their preference
// track outstanding preference requests
polls polls
blkReqs, pending ids.Set // prevent asking validators for the same block
// blocks that have outstanding get requests
blkReqs common.Requests
blocked events.Blocker // track operations that are blocked on blocks
// blocks that are fetched but haven't been issued due to missing
// dependencies
pending ids.Set
// operations that are blocked on a block being issued. This could be
// issuing another block, responding to a query, or applying votes to
// consensus
blocked events.Blocker
// mark for if the engine has been bootstrapped or not
bootstrapped bool
}
@ -33,7 +43,11 @@ func (t *Transitive) Initialize(config Config) {
config.Context.Log.Info("Initializing Snowman consensus")
t.Config = config
t.metrics.Initialize(config.Context.Log, config.Params.Namespace, config.Params.Metrics)
t.metrics.Initialize(
config.Context.Log,
config.Params.Namespace,
config.Params.Metrics,
)
t.onFinished = t.finishBootstrapping
t.bootstrapper.Initialize(config.BootstrapConfig)
@ -44,11 +58,19 @@ func (t *Transitive) Initialize(config Config) {
t.polls.m = make(map[uint32]poll)
}
// when bootstrapping is finished, this will be called. This initializes the
// consensus engine with the last accepted block.
func (t *Transitive) finishBootstrapping() {
// set the bootstrapped mark to switch consensus modes
t.bootstrapped = true
// initialize consensus to the last accepted blockID
tailID := t.Config.VM.LastAccepted()
t.Consensus.Initialize(t.Config.Context, t.Params, tailID)
// to maintain the invariant that oracle blocks are issued in the correct
// preferences, we need to handle the case that we are bootstrapping into an
// oracle block
tail, err := t.Config.VM.GetBlock(tailID)
if err != nil {
t.Config.Context.Log.Error("Failed to get last accepted block due to: %s", err)
@ -58,9 +80,12 @@ func (t *Transitive) finishBootstrapping() {
switch blk := tail.(type) {
case OracleBlock:
for _, blk := range blk.Options() {
// note that deliver will set the VM's preference
t.deliver(blk)
}
default:
// if there aren't blocks we need to deliver on startup, we need to set
// the preference to the last accepted block
t.Config.VM.SetPreference(tailID)
}
@ -91,15 +116,27 @@ func (t *Transitive) Context() *snow.Context { return t.Config.Context }
// Get implements the Engine interface
func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, blkID ids.ID) {
if blk, err := t.Config.VM.GetBlock(blkID); err == nil {
t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes())
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil {
// If we failed to get the block, that means either an unexpected error
// has occurred, the validator is not following the protocol, or the
// block has been pruned.
t.Config.Context.Log.Warn("Get called for blockID %s errored with %s",
blkID,
err)
return
}
// Respond to the validator with the fetched block and the same requestID.
t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes())
}
// Put implements the Engine interface
func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) {
t.Config.Context.Log.Verbo("Put called for blockID %s", blkID)
// if the engine hasn't been bootstrapped, forward the request to the
// bootstrapper
if !t.bootstrapped {
t.bootstrapper.Put(vdr, requestID, blkID, blkBytes)
return
@ -110,32 +147,53 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkByt
t.Config.Context.Log.Debug("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
t.GetFailed(vdr, requestID, blkID)
// because GetFailed doesn't utilize the assumption that we actually
// sent a Get message, we can safely call GetFailed here to potentially
// abandon the request.
t.GetFailed(vdr, requestID)
return
}
// insert the block into consensus. If the block has already been issued,
// this will be a noop. If this block has missing dependencies, vdr will
// receive requests to fill the ancestry. dependencies that have already
// been fetched, but with missing dependencies themselves won't be requested
// from the vdr.
t.insertFrom(vdr, blk)
}
// GetFailed implements the Engine interface
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, blkID ids.ID) {
func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) {
// if the engine hasn't been bootstrapped, forward the request to the
// bootstrapper
if !t.bootstrapped {
t.bootstrapper.GetFailed(vdr, requestID, blkID)
t.bootstrapper.GetFailed(vdr, requestID)
return
}
t.pending.Remove(blkID)
t.blocked.Abandon(blkID)
t.blkReqs.Remove(blkID)
// we don't use the assumption that this function is called after a failed
// Get message. So we first check to see if we have an outsanding request
// and also get what the request was for if it exists
blkID, ok := t.blkReqs.Remove(vdr, requestID)
if !ok {
t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s",
vdr)
return
}
// Tracks performance statistics
t.numBlockedBlk.Set(float64(t.pending.Len()))
// because the get request was dropped, we no longer are expected blkID to
// be issued.
t.blocked.Abandon(blkID)
}
// PullQuery implements the Engine interface
func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) {
// if the engine hasn't been bootstrapped, we aren't ready to respond to
// queries
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping", blkID)
t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping",
blkID)
return
}
@ -146,6 +204,8 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID)
requestID: requestID,
}
// if we aren't able to have issued this block, then it is a dependency for
// this reply
if !t.reinsertFrom(vdr, blkID) {
c.deps.Add(blkID)
}
@ -154,18 +214,37 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID)
}
// PushQuery implements the Engine interface
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blk []byte) {
func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) {
// if the engine hasn't been bootstrapped, we aren't ready to respond to
// queries
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", blkID)
return
}
t.Put(vdr, requestID, blkID, blk)
t.PullQuery(vdr, requestID, blkID)
blk, err := t.Config.VM.ParseBlock(blkBytes)
// If the parsing fails, we just drop the request, as we didn't ask for it
if err != nil {
t.Config.Context.Log.Warn("ParseBlock failed due to %s for block:\n%s",
err,
formatting.DumpBytes{Bytes: blkBytes})
return
}
// insert the block into consensus. If the block has already been issued,
// this will be a noop. If this block has missing dependencies, vdr will
// receive requests to fill the ancestry. dependencies that have already
// been fetched, but with missing dependencies themselves won't be requested
// from the vdr.
t.insertFrom(vdr, blk)
// register the chit request
t.PullQuery(vdr, requestID, blk.ID())
}
// Chits implements the Engine interface
func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
// if the engine hasn't been bootstrapped, we shouldn't be receiving chits
if !t.bootstrapped {
t.Config.Context.Log.Debug("Dropping Chits due to bootstrapping")
return
@ -173,7 +252,14 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
// Since this is snowman, there should only be one ID in the vote set
if votes.Len() != 1 {
t.Config.Context.Log.Debug("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d", votes.Len(), vdr, requestID)
t.Config.Context.Log.Debug("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d",
votes.Len(),
vdr,
requestID)
// because QueryFailed doesn't utilize the assumption that we actually
// sent a Query message, we can safely call QueryFailed here to
// potentially abandon the request.
t.QueryFailed(vdr, requestID)
return
}
@ -188,6 +274,8 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
response: vote,
}
// if we aren't able to have issued the vote's block, then it is a
// dependency for applying the vote
if !t.reinsertFrom(vdr, vote) {
v.deps.Add(vote)
}
@ -197,6 +285,7 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
// QueryFailed implements the Engine interface
func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) {
// if the engine hasn't been bootstrapped, we won't have sent a query
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping QueryFailed due to bootstrapping")
return
@ -211,6 +300,7 @@ func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) {
// Notify implements the Engine interface
func (t *Transitive) Notify(msg common.Message) {
// if the engine hasn't been bootstrapped, we shouldn't issuing blocks
if !t.bootstrapped {
t.Config.Context.Log.Warn("Dropping Notify due to bootstrapping")
return
@ -219,21 +309,32 @@ func (t *Transitive) Notify(msg common.Message) {
t.Config.Context.Log.Verbo("Snowman engine notified of %s from the vm", msg)
switch msg {
case common.PendingTxs:
if blk, err := t.Config.VM.BuildBlock(); err == nil {
if status := blk.Status(); status != choices.Processing {
t.Config.Context.Log.Warn("Attempting to issue a block with status: %s, expected Processing", status)
}
parentID := blk.Parent().ID()
if pref := t.Consensus.Preference(); !parentID.Equals(pref) {
t.Config.Context.Log.Warn("Built block with parent: %s, expected %s", parentID, pref)
}
if t.insertAll(blk) {
t.Config.Context.Log.Verbo("Successfully issued new block from the VM")
} else {
t.Config.Context.Log.Warn("VM.BuildBlock returned a block that is pending for ancestors")
}
} else {
// the pending txs message means we should attempt to build a block.
blk, err := t.Config.VM.BuildBlock()
if err != nil {
t.Config.Context.Log.Verbo("VM.BuildBlock errored with %s", err)
return
}
// a newly created block is expected to be processing. If this check
// fails, there is potentially an error in the VM this engine is running
if status := blk.Status(); status != choices.Processing {
t.Config.Context.Log.Warn("Attempting to issue a block with status: %s, expected Processing", status)
}
// the newly created block should be built on top of the preferred
// block. Otherwise, the new block doesn't have the best chance of being
// confirmed.
parentID := blk.Parent().ID()
if pref := t.Consensus.Preference(); !parentID.Equals(pref) {
t.Config.Context.Log.Warn("Built block with parent: %s, expected %s", parentID, pref)
}
// inserting the block shouldn't have any missing dependencies
if t.insertAll(blk) {
t.Config.Context.Log.Verbo("Successfully issued new block from the VM")
} else {
t.Config.Context.Log.Warn("VM.BuildBlock returned a block that is pending for ancestors")
}
default:
t.Config.Context.Log.Warn("Unexpected message from the VM: %s", msg)
@ -241,10 +342,20 @@ func (t *Transitive) Notify(msg common.Message) {
}
func (t *Transitive) repoll() {
// if we are issuing a repoll, we should gossip our current preferences to
// propagate the most likely branch as quickly as possible
prefID := t.Consensus.Preference()
t.pullSample(prefID)
for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ {
t.pullSample(prefID)
}
}
// reinsertFrom attempts to issue the branch ending with a block, from only its
// ID, to consensus. Returns true if the block was added, or was previously
// added, to consensus. This is useful to check the local DB before requesting a
// block in case we have the block for some reason. If the block or a dependency
// is missing, the validator will be sent a Get message.
func (t *Transitive) reinsertFrom(vdr ids.ShortID, blkID ids.ID) bool {
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil {
@ -254,44 +365,81 @@ func (t *Transitive) reinsertFrom(vdr ids.ShortID, blkID ids.ID) bool {
return t.insertFrom(vdr, blk)
}
// insertFrom attempts to issue the branch ending with a block to consensus.
// Returns true if the block was added, or was previously added, to consensus.
// This is useful to check the local DB before requesting a block in case we
// have the block for some reason. If a dependency is missing, the validator
// will be sent a Get message.
func (t *Transitive) insertFrom(vdr ids.ShortID, blk snowman.Block) bool {
blkID := blk.ID()
// if the block has been issued, we don't need to insert it. if the block is
// already pending, we shouldn't attempt to insert it again yet
for !t.Consensus.Issued(blk) && !t.pending.Contains(blkID) {
t.insert(blk)
parent := blk.Parent()
parentID := parent.ID()
if parentStatus := parent.Status(); !parentStatus.Fetched() {
t.sendRequest(vdr, parentID)
blk = blk.Parent()
blkID = blk.ID()
// if the parent hasn't been fetched, we need to request it to issue the
// newly inserted block
if !blk.Status().Fetched() {
t.sendRequest(vdr, blkID)
return false
}
blk = parent
blkID = parentID
}
return !t.pending.Contains(blkID)
return t.Consensus.Issued(blk)
}
// insertAll attempts to issue the branch ending with a block to consensus.
// Returns true if the block was added, or was previously added, to consensus.
// This is useful to check the local DB before requesting a block in case we
// have the block for some reason. If a dependency is missing and the dependency
// hasn't been requested, the issuance will be abandoned.
func (t *Transitive) insertAll(blk snowman.Block) bool {
blkID := blk.ID()
for blk.Status().Fetched() && !t.Consensus.Issued(blk) && !t.pending.Contains(blkID) {
t.insert(blk)
blk = blk.Parent()
blkID = blk.ID()
}
return !t.pending.Contains(blkID)
// if issuance the block was successful, this is the happy path
if t.Consensus.Issued(blk) {
return true
}
// if this branch is waiting on a block that we supposedly have a source of,
// we can just wait for that request to succeed or fail
if t.blkReqs.Contains(blkID) {
return false
}
// if we have no reason to expect that this block will be inserted, we
// should abandon the block to avoid a memory leak
t.blocked.Abandon(blkID)
return false
}
// attempt to insert the block to consensus. If the block's parent hasn't been
// issued, the insertion will block until the parent's issuance is abandoned or
// fulfilled
func (t *Transitive) insert(blk snowman.Block) {
blkID := blk.ID()
// mark that the block has been fetched but is pending
t.pending.Add(blkID)
t.blkReqs.Remove(blkID)
// if we have any outstanding requests for this block, remove the pending
// requests
t.blkReqs.RemoveAny(blkID)
i := &issuer{
t: t,
blk: blk,
}
// block on the parent if needed
if parent := blk.Parent(); !t.Consensus.Issued(parent) {
parentID := parent.ID()
t.Config.Context.Log.Verbo("Block waiting for parent %s", parentID)
@ -306,17 +454,22 @@ func (t *Transitive) insert(blk snowman.Block) {
}
func (t *Transitive) sendRequest(vdr ids.ShortID, blkID ids.ID) {
if !t.blkReqs.Contains(blkID) {
t.blkReqs.Add(blkID)
t.numBlkRequests.Set(float64(t.blkReqs.Len())) // Tracks performance statistics
t.RequestID++
t.Config.Context.Log.Verbo("Sending Get message for %s", blkID)
t.Config.Sender.Get(vdr, t.RequestID, blkID)
// only send one request at a time for a block
if t.blkReqs.Contains(blkID) {
return
}
t.Config.Context.Log.Verbo("Sending Get message for %s", blkID)
t.RequestID++
t.blkReqs.Add(vdr, t.RequestID, blkID)
t.Config.Sender.Get(vdr, t.RequestID, blkID)
// Tracks performance statistics
t.numBlkRequests.Set(float64(t.blkReqs.Len()))
}
// send a pull request for this block ID
func (t *Transitive) pullSample(blkID ids.ID) {
t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators)
p := t.Consensus.Parameters()
@ -326,15 +479,22 @@ func (t *Transitive) pullSample(blkID ids.ID) {
vdrSet.Add(vdr.ID())
}
t.RequestID++
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID)
} else if numVdrs < p.K {
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID)
return
}
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID)
return
}
t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID)
}
func (t *Transitive) pushSample(blk snowman.Block) bool {
// send a push request for this block
func (t *Transitive) pushSample(blk snowman.Block) {
t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators)
p := t.Consensus.Parameters()
vdrs := t.Config.Validators.Sample(p.K)
@ -343,15 +503,20 @@ func (t *Transitive) pushSample(blk snowman.Block) bool {
vdrSet.Add(vdr.ID())
}
t.RequestID++
queryIssued := false
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Sender.PushQuery(vdrSet, t.RequestID, blk.ID(), blk.Bytes())
queryIssued = true
} else if numVdrs < p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blk.ID())
blkID := blk.ID()
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID)
return
}
return queryIssued
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID)
return
}
t.Config.Sender.PushQuery(vdrSet, t.RequestID, blkID, blk.Bytes())
return
}
func (t *Transitive) deliver(blk snowman.Block) {
@ -359,11 +524,14 @@ func (t *Transitive) deliver(blk snowman.Block) {
return
}
// we are adding the block to consensus, so it is no longer pending
blkID := blk.ID()
t.pending.Remove(blkID)
if err := blk.Verify(); err != nil {
t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err)
// if verify fails, then all decedents are also invalid
t.blocked.Abandon(blkID)
t.numBlockedBlk.Set(float64(t.pending.Len())) // Tracks performance statistics
return
@ -371,8 +539,10 @@ func (t *Transitive) deliver(blk snowman.Block) {
t.Config.Context.Log.Verbo("Adding block to consensus: %s", blkID)
t.Consensus.Add(blk)
polled := t.pushSample(blk)
// Add all the oracle blocks if they exist. We call verify on all the blocks
// and add them to consensus before marking anything as fulfilled to avoid
// any potential reentrant bugs.
added := []snowman.Block{}
dropped := []snowman.Block{}
switch blk := blk.(type) {
@ -380,20 +550,23 @@ func (t *Transitive) deliver(blk snowman.Block) {
for _, blk := range blk.Options() {
if err := blk.Verify(); err != nil {
t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err)
t.blocked.Abandon(blk.ID())
dropped = append(dropped, blk)
} else {
t.Consensus.Add(blk)
t.pushSample(blk)
added = append(added, blk)
}
}
}
t.Config.VM.SetPreference(t.Consensus.Preference())
t.blocked.Fulfill(blkID)
// launch a query for the newly added block
t.pushSample(blk)
t.blocked.Fulfill(blkID)
for _, blk := range added {
t.pushSample(blk)
blkID := blk.ID()
t.pending.Remove(blkID)
t.blocked.Fulfill(blkID)
@ -404,9 +577,8 @@ func (t *Transitive) deliver(blk snowman.Block) {
t.blocked.Abandon(blkID)
}
if polled && len(t.polls.m) < t.Params.ConcurrentRepolls {
t.repoll()
}
// If we should issue multiple queries at the same time, we need to repoll
t.repoll()
// Tracks performance statistics
t.numBlkRequests.Set(float64(t.blkReqs.Len()))

View File

@ -102,7 +102,9 @@ func TestEngineAdd(t *testing.T) {
}
asked := new(bool)
sender.GetF = func(inVdr ids.ShortID, _ uint32, blkID ids.ID) {
reqID := new(uint32)
sender.GetF = func(inVdr ids.ShortID, requestID uint32, blkID ids.ID) {
*reqID = requestID
if *asked {
t.Fatalf("Asked multiple times")
}
@ -136,7 +138,7 @@ func TestEngineAdd(t *testing.T) {
vm.ParseBlockF = func(b []byte) (snowman.Block, error) { return nil, errParseBlock }
te.Put(vdr.ID(), 0, blk.Parent().ID(), nil)
te.Put(vdr.ID(), *reqID, blk.Parent().ID(), nil)
vm.ParseBlockF = nil
@ -906,7 +908,11 @@ func TestEngineAbandonQuery(t *testing.T) {
panic("Should have failed")
}
}
sender.CantGet = false
reqID := new(uint32)
sender.GetF = func(_ ids.ShortID, requestID uint32, _ ids.ID) {
*reqID = requestID
}
te.PullQuery(vdr.ID(), 0, blkID)
@ -914,7 +920,7 @@ func TestEngineAbandonQuery(t *testing.T) {
t.Fatalf("Should have blocked on request")
}
te.GetFailed(vdr.ID(), 0, blkID)
te.GetFailed(vdr.ID(), *reqID)
if len(te.blocked) != 0 {
t.Fatalf("Should have removed request")
@ -947,7 +953,12 @@ func TestEngineAbandonChit(t *testing.T) {
panic("Should have failed")
}
}
sender.CantGet = false
reqID := new(uint32)
sender.GetF = func(_ ids.ShortID, requestID uint32, _ ids.ID) {
*reqID = requestID
}
fakeBlkIDSet := ids.Set{}
fakeBlkIDSet.Add(fakeBlkID)
te.Chits(vdr.ID(), 0, fakeBlkIDSet)
@ -956,7 +967,7 @@ func TestEngineAbandonChit(t *testing.T) {
t.Fatalf("Should have blocked on request")
}
te.GetFailed(vdr.ID(), 0, fakeBlkID)
te.GetFailed(vdr.ID(), *reqID)
if len(te.blocked) != 0 {
t.Fatalf("Should have removed request")
@ -1105,14 +1116,18 @@ func TestEngineRetryFetch(t *testing.T) {
}
vm.CantGetBlock = false
sender.CantGet = false
reqID := new(uint32)
sender.GetF = func(_ ids.ShortID, requestID uint32, _ ids.ID) {
*reqID = requestID
}
te.PullQuery(vdr.ID(), 0, missingBlk.ID())
vm.CantGetBlock = true
sender.CantGet = true
sender.GetF = nil
te.GetFailed(vdr.ID(), 0, missingBlk.ID())
te.GetFailed(vdr.ID(), *reqID)
vm.CantGetBlock = false
@ -1124,7 +1139,7 @@ func TestEngineRetryFetch(t *testing.T) {
te.PullQuery(vdr.ID(), 0, missingBlk.ID())
vm.CantGetBlock = true
sender.CantGet = true
sender.GetF = nil
if !*called {
t.Fatalf("Should have requested the block again")
@ -1220,3 +1235,290 @@ func TestEngineGossip(t *testing.T) {
t.Fatalf("Should have gossiped the block")
}
}
func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) {
vdr, vdrs, sender, vm, te, gBlk := setup(t)
secondVdr := validators.GenerateRandomValidator(1)
vdrs.Add(secondVdr)
sender.Default(true)
missingBlk := &Blk{
parent: gBlk,
id: GenerateID(),
height: 1,
status: choices.Unknown,
bytes: []byte{1},
}
pendingBlk := &Blk{
parent: missingBlk,
id: GenerateID(),
height: 2,
status: choices.Processing,
bytes: []byte{2},
}
parsed := new(bool)
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, pendingBlk.Bytes()):
*parsed = true
return pendingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(pendingBlk.ID()):
return pendingBlk, nil
}
return nil, errUnknownBlock
}
reqID := new(uint32)
sender.GetF = func(reqVdr ids.ShortID, requestID uint32, blkID ids.ID) {
*reqID = requestID
if !reqVdr.Equals(vdr.ID()) {
t.Fatalf("Wrong validator requested")
}
if !blkID.Equals(missingBlk.ID()) {
t.Fatalf("Wrong block requested")
}
}
te.PushQuery(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes())
te.Put(secondVdr.ID(), *reqID, missingBlk.ID(), []byte{3})
*parsed = false
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, missingBlk.Bytes()):
*parsed = true
return missingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(missingBlk.ID()):
return missingBlk, nil
}
return nil, errUnknownBlock
}
sender.CantPushQuery = false
sender.CantChits = false
missingBlk.status = choices.Processing
te.Put(vdr.ID(), *reqID, missingBlk.ID(), missingBlk.Bytes())
pref := te.Consensus.Preference()
if !pref.Equals(pendingBlk.ID()) {
t.Fatalf("Shouldn't have abandoned the pending block")
}
}
func TestEnginePushQueryRequestIDConflict(t *testing.T) {
vdr, _, sender, vm, te, gBlk := setup(t)
sender.Default(true)
missingBlk := &Blk{
parent: gBlk,
id: GenerateID(),
height: 1,
status: choices.Unknown,
bytes: []byte{1},
}
pendingBlk := &Blk{
parent: missingBlk,
id: GenerateID(),
height: 2,
status: choices.Processing,
bytes: []byte{2},
}
randomBlkID := GenerateID()
parsed := new(bool)
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, pendingBlk.Bytes()):
*parsed = true
return pendingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(pendingBlk.ID()):
return pendingBlk, nil
}
return nil, errUnknownBlock
}
reqID := new(uint32)
sender.GetF = func(reqVdr ids.ShortID, requestID uint32, blkID ids.ID) {
*reqID = requestID
if !reqVdr.Equals(vdr.ID()) {
t.Fatalf("Wrong validator requested")
}
if !blkID.Equals(missingBlk.ID()) {
t.Fatalf("Wrong block requested")
}
}
te.PushQuery(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes())
sender.GetF = nil
sender.CantGet = false
te.PushQuery(vdr.ID(), *reqID, randomBlkID, []byte{3})
*parsed = false
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, missingBlk.Bytes()):
*parsed = true
return missingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(missingBlk.ID()):
return missingBlk, nil
}
return nil, errUnknownBlock
}
sender.CantPushQuery = false
sender.CantChits = false
te.Put(vdr.ID(), *reqID, missingBlk.ID(), missingBlk.Bytes())
pref := te.Consensus.Preference()
if !pref.Equals(pendingBlk.ID()) {
t.Fatalf("Shouldn't have abandoned the pending block")
}
}
func TestEngineAggressivePolling(t *testing.T) {
config := DefaultConfig()
config.Params.ConcurrentRepolls = 2
vdr := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
config.Validators = vals
vals.Add(vdr)
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
vm := &VMTest{}
vm.T = t
config.VM = vm
vm.Default(true)
vm.CantSetPreference = false
gBlk := &Blk{
id: GenerateID(),
status: choices.Accepted,
}
vm.LastAcceptedF = func() ids.ID { return gBlk.ID() }
sender.CantGetAcceptedFrontier = false
te := &Transitive{}
te.Initialize(config)
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !blkID.Equals(gBlk.ID()) {
t.Fatalf("Wrong block requested")
}
return gBlk, nil
}
te.finishBootstrapping()
vm.GetBlockF = nil
vm.LastAcceptedF = nil
sender.CantGetAcceptedFrontier = true
sender.Default(true)
pendingBlk := &Blk{
parent: gBlk,
id: GenerateID(),
height: 2,
status: choices.Processing,
bytes: []byte{1},
}
parsed := new(bool)
vm.ParseBlockF = func(b []byte) (snowman.Block, error) {
switch {
case bytes.Equal(b, pendingBlk.Bytes()):
*parsed = true
return pendingBlk, nil
}
return nil, errUnknownBlock
}
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
if !*parsed {
return nil, errUnknownBlock
}
switch {
case blkID.Equals(pendingBlk.ID()):
return pendingBlk, nil
}
return nil, errUnknownBlock
}
numPushed := new(int)
sender.PushQueryF = func(_ ids.ShortSet, _ uint32, _ ids.ID, _ []byte) { *numPushed++ }
numPulled := new(int)
sender.PullQueryF = func(_ ids.ShortSet, _ uint32, _ ids.ID) { *numPulled++ }
te.Put(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes())
if *numPushed != 1 {
t.Fatalf("Should have initially sent a push query")
}
if *numPulled != 1 {
t.Fatalf("Should have sent an additional pull query")
}
}

View File

@ -56,10 +56,7 @@ func (v *voter) Update() {
}
v.t.Config.Context.Log.Verbo("Snowman engine can't quiesce")
if len(v.t.polls.m) < v.t.Config.Params.ConcurrentRepolls {
v.t.repoll()
}
v.t.repoll()
}
func (v *voter) bubbleVotes(votes ids.Bag) ids.Bag {

View File

@ -78,7 +78,7 @@ func (h *Handler) dispatchMsg(msg message) bool {
case getMsg:
h.engine.Get(msg.validatorID, msg.requestID, msg.containerID)
case getFailedMsg:
h.engine.GetFailed(msg.validatorID, msg.requestID, msg.containerID)
h.engine.GetFailed(msg.validatorID, msg.requestID)
case putMsg:
h.engine.Put(msg.validatorID, msg.requestID, msg.containerID, msg.container)
case pushQueryMsg:
@ -185,12 +185,11 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids
}
// GetFailed passes a GetFailed message to the consensus engine.
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
h.msgs <- message{
messageType: getFailedMsg,
validatorID: validatorID,
requestID: requestID,
containerID: containerID,
}
}

View File

@ -42,6 +42,6 @@ type ExternalRouter interface {
type InternalRouter interface {
GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID)
GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32)
}

View File

@ -187,13 +187,13 @@ func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID ui
// GetFailed routes an incoming GetFailed message from the validator with ID [validatorID]
// to the consensus engine working on the chain with ID [chainID]
func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
chain.GetFailed(validatorID, requestID, containerID)
chain.GetFailed(validatorID, requestID)
} else {
sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID)
}

View File

@ -88,7 +88,7 @@ func (s *Sender) Get(validatorID ids.ShortID, requestID uint32, containerID ids.
// Add a timeout -- if we don't get a response before the timeout expires,
// send this consensus engine a GetFailed message
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetFailed(validatorID, s.ctx.ChainID, requestID, containerID)
s.router.GetFailed(validatorID, s.ctx.ChainID, requestID)
})
s.sender.Get(validatorID, s.ctx.ChainID, requestID, containerID)
}