diff --git a/.editorconfig b/.editorconfig new file mode 100644 index 0000000..8fcd83f --- /dev/null +++ b/.editorconfig @@ -0,0 +1,6 @@ +# https://editorconfig.org/ + +[*] +end_of_line = lf +insert_final_newline = true +trim_trailing_newspace = true diff --git a/networking/voting_handlers.go b/networking/voting_handlers.go index 3a27185..2c8dce6 100644 --- a/networking/voting_handlers.go +++ b/networking/voting_handlers.go @@ -234,7 +234,7 @@ func (s *Voting) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, peer, exists := s.conns.GetPeerID(validatorID) if !exists { s.log.Debug("attempted to send a Get message to a disconnected validator: %s", validatorID) - s.executor.Add(func() { s.router.GetFailed(validatorID, chainID, requestID, containerID) }) + s.executor.Add(func() { s.router.GetFailed(validatorID, chainID, requestID) }) return // Validator is not connected } diff --git a/scripts/build.sh b/scripts/build.sh index 46147a9..7f2ed44 100755 --- a/scripts/build.sh +++ b/scripts/build.sh @@ -1,4 +1,8 @@ -#!/bin/bash -e +#!/bin/bash + +set -o errexit +set -o nounset +set -o pipefail # Ted: contact me when you make any changes diff --git a/scripts/build_image.sh b/scripts/build_image.sh index 5da7f59..b3401b6 100755 --- a/scripts/build_image.sh +++ b/scripts/build_image.sh @@ -1,4 +1,9 @@ -#!/bin/bash -e +#!/bin/bash + +set -o errexit +set -o nounset +set -o pipefail + SRC_DIR="$(dirname "${BASH_SOURCE[0]}")" export GOPATH="$SRC_DIR/.build_image_gopath" WORKPREFIX="$GOPATH/src/github.com/ava-labs/" diff --git a/scripts/build_test.sh b/scripts/build_test.sh index d12bbf9..157dd8b 100755 --- a/scripts/build_test.sh +++ b/scripts/build_test.sh @@ -1,4 +1,8 @@ -#!/bin/bash -e +#!/bin/bash + +set -o errexit +set -o nounset +set -o pipefail # Ted: contact me when you make any changes diff --git a/snow/consensus/snowman/snowman_block.go b/snow/consensus/snowman/snowman_block.go index b0c1ba4..dd69387 100644 --- a/snow/consensus/snowman/snowman_block.go +++ b/snow/consensus/snowman/snowman_block.go @@ -16,7 +16,7 @@ type snowmanBlock struct { // block that this node contains. For the genesis, this value will be nil blk Block - // shouldFalter is set to true if this node, and all its decendants received + // shouldFalter is set to true if this node, and all its descendants received // less than Alpha votes shouldFalter bool diff --git a/snow/consensus/snowman/topological.go b/snow/consensus/snowman/topological.go index 459673f..e852030 100644 --- a/snow/consensus/snowman/topological.go +++ b/snow/consensus/snowman/topological.go @@ -149,7 +149,7 @@ func (ts *Topological) Preference() ids.ID { return ts.tail } // During the sort, votes are pushed towards the genesis. To prevent interating // over all blocks that had unsuccessful polls, we set a flag on the block to // know that any future traversal through that block should register an -// unsuccessful poll on that block and every decendant block. +// unsuccessful poll on that block and every descendant block. // // The complexity of this function is: // - Runtime = 3 * |live set| + |votes| @@ -408,7 +408,7 @@ func (ts *Topological) getPreferredDecendent(blkID ids.ID) ids.ID { // accept the preferred child of the provided snowman block. By accepting the // preferred child, all other children will be rejected. When these children are -// rejected, all their decendants will be rejected. +// rejected, all their descendants will be rejected. func (ts *Topological) accept(n *snowmanBlock) { // We are finalizing the block's child, so we need to get the preference pref := n.sb.Preference() @@ -451,11 +451,11 @@ func (ts *Topological) accept(n *snowmanBlock) { rejects = append(rejects, childID) } - // reject all the decendants of the blocks we just rejected + // reject all the descendants of the blocks we just rejected ts.rejectTransitively(rejects) } -// Takes in a list of rejected ids and rejects all decendants of these IDs +// Takes in a list of rejected ids and rejects all descendants of these IDs func (ts *Topological) rejectTransitively(rejected []ids.ID) { // the rejected array is treated as a queue, with the next element at index // 0 and the last element at the end of the slice. diff --git a/snow/engine/avalanche/bootstrapper.go b/snow/engine/avalanche/bootstrapper.go index dc1c0b1..304172f 100644 --- a/snow/engine/avalanche/bootstrapper.go +++ b/snow/engine/avalanche/bootstrapper.go @@ -30,6 +30,9 @@ type bootstrapper struct { metrics common.Bootstrapper + // vtxReqs prevents asking validators for the same vertex + vtxReqs common.Requests + pending ids.Set finished bool onFinished func() @@ -90,16 +93,22 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) { func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) { b.BootstrapConfig.Context.Log.Verbo("Put called for vertexID %s", vtxID) - if !b.pending.Contains(vtxID) { - return - } - vtx, err := b.State.ParseVertex(vtxBytes) if err != nil { b.BootstrapConfig.Context.Log.Debug("ParseVertex failed due to %s for block:\n%s", err, formatting.DumpBytes{Bytes: vtxBytes}) - b.GetFailed(vdr, requestID, vtxID) + + b.GetFailed(vdr, requestID) + return + } + + if !b.pending.Contains(vtx.ID()) { + b.BootstrapConfig.Context.Log.Debug("Validator %s sent an unrequested vertex:\n%s", + vdr, + formatting.DumpBytes{Bytes: vtxBytes}) + + b.GetFailed(vdr, requestID) return } @@ -107,7 +116,16 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxB } // GetFailed ... -func (b *bootstrapper) GetFailed(_ ids.ShortID, _ uint32, vtxID ids.ID) { b.sendRequest(vtxID) } +func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) { + vtxID, ok := b.vtxReqs.Remove(vdr, requestID) + if !ok { + b.BootstrapConfig.Context.Log.Debug("GetFailed called without sending the corresponding Get message from %s", + vdr) + return + } + + b.sendRequest(vtxID) +} func (b *bootstrapper) fetch(vtxID ids.ID) { if b.pending.Contains(vtxID) { @@ -131,6 +149,9 @@ func (b *bootstrapper) sendRequest(vtxID ids.ID) { validatorID := validators[0].ID() b.RequestID++ + b.vtxReqs.RemoveAny(vtxID) + b.vtxReqs.Add(validatorID, b.RequestID, vtxID) + b.pending.Add(vtxID) b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, vtxID) diff --git a/snow/engine/avalanche/bootstrapper_test.go b/snow/engine/avalanche/bootstrapper_test.go index 3fa2cbe..c477e05 100644 --- a/snow/engine/avalanche/bootstrapper_test.go +++ b/snow/engine/avalanche/bootstrapper_test.go @@ -289,7 +289,6 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) { bs.ForceAccepted(acceptedIDs) state.getVertex = nil - sender.GetF = nil state.parseVertex = func(vtxBytes []byte) (avalanche.Vertex, error) { switch { @@ -1008,3 +1007,111 @@ func TestBootstrapperPartialFetch(t *testing.T) { t.Fatalf("wrong number pending") } } + +func TestBootstrapperWrongIDByzantineResponse(t *testing.T) { + config, peerID, sender, state, _ := newConfig(t) + + vtxID0 := ids.Empty.Prefix(0) + vtxID1 := ids.Empty.Prefix(1) + + vtxBytes0 := []byte{0} + vtxBytes1 := []byte{1} + + vtx0 := &Vtx{ + id: vtxID0, + height: 0, + status: choices.Processing, + bytes: vtxBytes0, + } + vtx1 := &Vtx{ + id: vtxID1, + height: 0, + status: choices.Processing, + bytes: vtxBytes1, + } + + bs := bootstrapper{} + bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry()) + bs.Initialize(config) + + acceptedIDs := ids.Set{} + acceptedIDs.Add( + vtxID0, + ) + + state.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + switch { + case vtxID.Equals(vtxID0): + return nil, errUnknownVertex + default: + t.Fatal(errUnknownVertex) + panic(errUnknownVertex) + } + } + + requestID := new(uint32) + sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) { + if !vdr.Equals(peerID) { + t.Fatalf("Should have requested vertex from %s, requested from %s", peerID, vdr) + } + switch { + case vtxID.Equals(vtxID0): + default: + t.Fatalf("Requested unknown vertex") + } + + *requestID = reqID + } + + bs.ForceAccepted(acceptedIDs) + + state.getVertex = nil + sender.GetF = nil + + state.parseVertex = func(vtxBytes []byte) (avalanche.Vertex, error) { + switch { + case bytes.Equal(vtxBytes, vtxBytes0): + return vtx0, nil + case bytes.Equal(vtxBytes, vtxBytes1): + return vtx1, nil + } + t.Fatal(errParsedUnknownVertex) + return nil, errParsedUnknownVertex + } + + state.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + switch { + case vtxID.Equals(vtxID0): + return vtx0, nil + case vtxID.Equals(vtxID1): + return vtx1, nil + default: + t.Fatal(errUnknownVertex) + panic(errUnknownVertex) + } + } + + finished := new(bool) + bs.onFinished = func() { *finished = true } + sender.CantGet = false + + bs.Put(peerID, *requestID, vtxID0, vtxBytes1) + + sender.CantGet = true + + bs.Put(peerID, *requestID, vtxID0, vtxBytes0) + + state.parseVertex = nil + state.edge = nil + bs.onFinished = nil + + if !*finished { + t.Fatalf("Bootstrapping should have finished") + } + if vtx0.Status() != choices.Accepted { + t.Fatalf("Vertex should be accepted") + } + if vtx1.Status() != choices.Processing { + t.Fatalf("Vertex should be processing") + } +} diff --git a/snow/engine/avalanche/issuer.go b/snow/engine/avalanche/issuer.go index 4be29b3..57e2179 100644 --- a/snow/engine/avalanche/issuer.go +++ b/snow/engine/avalanche/issuer.go @@ -76,10 +76,8 @@ func (i *issuer) Update() { } i.t.RequestID++ - polled := false if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet.Len()) { i.t.Config.Sender.PushQuery(vdrSet, i.t.RequestID, vtxID, i.vtx.Bytes()) - polled = true } else if numVdrs < p.K { i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID) } @@ -89,9 +87,7 @@ func (i *issuer) Update() { i.t.txBlocked.Fulfill(tx.ID()) } - if polled && len(i.t.polls.m) < i.t.Params.ConcurrentRepolls { - i.t.repoll() - } + i.t.repoll() } type vtxIssuer struct{ i *issuer } diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index e4f3ccc..be4bac9 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -23,8 +23,10 @@ type Transitive struct { polls polls // track people I have asked for their preference // vtxReqs prevents asking validators for the same vertex + vtxReqs common.Requests + // missingTxs tracks transaction that are missing - vtxReqs, missingTxs, pending ids.Set + missingTxs, pending ids.Set // vtxBlocked tracks operations that are blocked on vertices // txBlocked tracks operations that are blocked on transactions @@ -115,22 +117,27 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxByt t.Config.Context.Log.Debug("ParseVertex failed due to %s for block:\n%s", err, formatting.DumpBytes{Bytes: vtxBytes}) - t.GetFailed(vdr, requestID, vtxID) + t.GetFailed(vdr, requestID) return } t.insertFrom(vdr, vtx) } // GetFailed implements the Engine interface -func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, vtxID ids.ID) { +func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) { if !t.bootstrapped { - t.bootstrapper.GetFailed(vdr, requestID, vtxID) + t.bootstrapper.GetFailed(vdr, requestID) + return + } + + vtxID, ok := t.vtxReqs.Remove(vdr, requestID) + if !ok { + t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s", + vdr) return } - t.pending.Remove(vtxID) t.vtxBlocked.Abandon(vtxID) - t.vtxReqs.Remove(vtxID) if t.vtxReqs.Len() == 0 { for _, txID := range t.missingTxs.List() { @@ -142,7 +149,6 @@ func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, vtxID ids.ID) // Track performance statistics t.numVtxRequests.Set(float64(t.vtxReqs.Len())) t.numTxRequests.Set(float64(t.missingTxs.Len())) - t.numBlockedVtx.Set(float64(t.pending.Len())) } // PullQuery implements the Engine interface @@ -167,14 +173,22 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID) } // PushQuery implements the Engine interface -func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtx []byte) { +func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxBytes []byte) { if !t.bootstrapped { t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", vtxID) return } - t.Put(vdr, requestID, vtxID, vtx) - t.PullQuery(vdr, requestID, vtxID) + vtx, err := t.Config.State.ParseVertex(vtxBytes) + if err != nil { + t.Config.Context.Log.Warn("ParseVertex failed due to %s for block:\n%s", + err, + formatting.DumpBytes{Bytes: vtxBytes}) + return + } + t.insertFrom(vdr, vtx) + + t.PullQuery(vdr, requestID, vtx.ID()) } // Chits implements the Engine interface @@ -220,8 +234,16 @@ func (t *Transitive) Notify(msg common.Message) { } func (t *Transitive) repoll() { + if len(t.polls.m) >= t.Params.ConcurrentRepolls { + return + } + txs := t.Config.VM.PendingTxs() t.batch(txs, false /*=force*/, true /*=empty*/) + + for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ { + t.batch(nil, false /*=force*/, true /*=empty*/) + } } func (t *Transitive) reinsertFrom(vdr ids.ShortID, vtxID ids.ID) bool { @@ -266,7 +288,7 @@ func (t *Transitive) insert(vtx avalanche.Vertex) { vtxID := vtx.ID() t.pending.Add(vtxID) - t.vtxReqs.Remove(vtxID) + t.vtxReqs.RemoveAny(vtxID) i := &issuer{ t: t, @@ -395,10 +417,10 @@ func (t *Transitive) sendRequest(vdr ids.ShortID, vtxID ids.ID) { return } - t.vtxReqs.Add(vtxID) + t.RequestID++ + + t.vtxReqs.Add(vdr, t.RequestID, vtxID) + t.Config.Sender.Get(vdr, t.RequestID, vtxID) t.numVtxRequests.Set(float64(t.vtxReqs.Len())) // Tracks performance statistics - - t.RequestID++ - t.Config.Sender.Get(vdr, t.RequestID, vtxID) } diff --git a/snow/engine/avalanche/transitive_test.go b/snow/engine/avalanche/transitive_test.go index 4c95c89..7f27fe1 100644 --- a/snow/engine/avalanche/transitive_test.go +++ b/snow/engine/avalanche/transitive_test.go @@ -40,6 +40,7 @@ func TestEngineShutdown(t *testing.T) { t.Fatal("Shutting down the Transitive did not shutdown the VM") } } + func TestEngineAdd(t *testing.T) { config := DefaultConfig() @@ -85,7 +86,9 @@ func TestEngineAdd(t *testing.T) { } asked := new(bool) - sender.GetF = func(inVdr ids.ShortID, _ uint32, vtxID ids.ID) { + reqID := new(uint32) + sender.GetF = func(inVdr ids.ShortID, requestID uint32, vtxID ids.ID) { + *reqID = requestID if *asked { t.Fatalf("Asked multiple times") } @@ -119,7 +122,7 @@ func TestEngineAdd(t *testing.T) { st.parseVertex = func(b []byte) (avalanche.Vertex, error) { return nil, errFailedParsing } - te.Put(vdr.ID(), 0, vtx.parents[0].ID(), nil) + te.Put(vdr.ID(), *reqID, vtx.parents[0].ID(), nil) st.parseVertex = nil @@ -485,7 +488,9 @@ func TestEngineMultipleQuery(t *testing.T) { } asked := new(bool) - sender.GetF = func(inVdr ids.ShortID, _ uint32, vtxID ids.ID) { + reqID := new(uint32) + sender.GetF = func(inVdr ids.ShortID, requestID uint32, vtxID ids.ID) { + *reqID = requestID if *asked { t.Fatalf("Asked multiple times") } @@ -512,7 +517,7 @@ func TestEngineMultipleQuery(t *testing.T) { // Should be dropped because the query was marked as failed te.Chits(vdr1.ID(), *queryRequestID, s0) - te.GetFailed(vdr0.ID(), 0, vtx1.ID()) + te.GetFailed(vdr0.ID(), *reqID) if vtx0.Status() != choices.Accepted { t.Fatalf("Should have executed vertex") @@ -598,6 +603,12 @@ func TestEngineAbandonResponse(t *testing.T) { st := &stateTest{t: t} config.State = st + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + sender.Default(true) + gVtx := &Vtx{ id: GenerateID(), status: choices.Accepted, @@ -629,8 +640,13 @@ func TestEngineAbandonResponse(t *testing.T) { te.Initialize(config) te.finishBootstrapping() + reqID := new(uint32) + sender.GetF = func(vID ids.ShortID, requestID uint32, vtxID ids.ID) { + *reqID = requestID + } + te.PullQuery(vdr.ID(), 0, vtx.ID()) - te.GetFailed(vdr.ID(), 0, vtx.ID()) + te.GetFailed(vdr.ID(), *reqID) if len(te.vtxBlocked) != 0 { t.Fatalf("Should have removed blocking event") @@ -2098,7 +2114,7 @@ func TestEngineReissueAbortedVertex(t *testing.T) { sender.GetF = nil st.parseVertex = nil - te.GetFailed(vdrID, *requestID, vtxID0) + te.GetFailed(vdrID, *requestID) requested := new(bool) sender.GetF = func(_ ids.ShortID, _ uint32, vtxID ids.ID) { @@ -2587,3 +2603,375 @@ func TestEngineGossip(t *testing.T) { t.Fatalf("Should have gossiped the vertex") } } + +func TestEngineInvalidVertexIgnoredFromUnexpectedPeer(t *testing.T) { + config := DefaultConfig() + + vdr := validators.GenerateRandomValidator(1) + secondVdr := validators.GenerateRandomValidator(1) + + vals := validators.NewSet() + config.Validators = vals + + vals.Add(vdr) + vals.Add(secondVdr) + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + st := &stateTest{t: t} + config.State = st + + gVtx := &Vtx{ + id: GenerateID(), + status: choices.Accepted, + bytes: []byte{0}, + } + + vts := []avalanche.Vertex{gVtx} + utxos := []ids.ID{GenerateID(), GenerateID()} + + tx0 := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx0.Ins.Add(utxos[0]) + + tx1 := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx1.Ins.Add(utxos[1]) + + vtx0 := &Vtx{ + parents: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx0}, + height: 1, + status: choices.Unknown, + bytes: []byte{1}, + } + + vtx1 := &Vtx{ + parents: []avalanche.Vertex{vtx0}, + id: GenerateID(), + txs: []snowstorm.Tx{tx1}, + height: 2, + status: choices.Processing, + bytes: []byte{2}, + } + + te := &Transitive{} + te.Initialize(config) + te.finishBootstrapping() + + parsed := new(bool) + st.parseVertex = func(b []byte) (avalanche.Vertex, error) { + switch { + case bytes.Equal(b, vtx1.Bytes()): + *parsed = true + return vtx1, nil + } + return nil, errUnknownVertex + } + + st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + if !*parsed { + return nil, errUnknownVertex + } + + switch { + case vtxID.Equals(vtx1.ID()): + return vtx1, nil + } + return nil, errUnknownVertex + } + + reqID := new(uint32) + sender.GetF = func(reqVdr ids.ShortID, requestID uint32, vtxID ids.ID) { + *reqID = requestID + if !reqVdr.Equals(vdr.ID()) { + t.Fatalf("Wrong validator requested") + } + if !vtxID.Equals(vtx0.ID()) { + t.Fatalf("Wrong vertex requested") + } + } + + te.PushQuery(vdr.ID(), 0, vtx1.ID(), vtx1.Bytes()) + + te.Put(secondVdr.ID(), *reqID, vtx0.ID(), []byte{3}) + + *parsed = false + st.parseVertex = func(b []byte) (avalanche.Vertex, error) { + switch { + case bytes.Equal(b, vtx0.Bytes()): + *parsed = true + return vtx0, nil + } + return nil, errUnknownVertex + } + + st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + if !*parsed { + return nil, errUnknownVertex + } + + switch { + case vtxID.Equals(vtx0.ID()): + return vtx0, nil + } + return nil, errUnknownVertex + } + sender.CantPushQuery = false + sender.CantChits = false + + vtx0.status = choices.Processing + + te.Put(vdr.ID(), *reqID, vtx0.ID(), vtx0.Bytes()) + + prefs := te.Consensus.Preferences() + if !prefs.Contains(vtx1.ID()) { + t.Fatalf("Shouldn't have abandoned the pending vertex") + } +} + +func TestEnginePushQueryRequestIDConflict(t *testing.T) { + config := DefaultConfig() + + vdr := validators.GenerateRandomValidator(1) + + vals := validators.NewSet() + config.Validators = vals + + vals.Add(vdr) + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + st := &stateTest{t: t} + config.State = st + + gVtx := &Vtx{ + id: GenerateID(), + status: choices.Accepted, + bytes: []byte{0}, + } + + vts := []avalanche.Vertex{gVtx} + utxos := []ids.ID{GenerateID(), GenerateID()} + + tx0 := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx0.Ins.Add(utxos[0]) + + tx1 := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx1.Ins.Add(utxos[1]) + + vtx0 := &Vtx{ + parents: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx0}, + height: 1, + status: choices.Unknown, + bytes: []byte{1}, + } + + vtx1 := &Vtx{ + parents: []avalanche.Vertex{vtx0}, + id: GenerateID(), + txs: []snowstorm.Tx{tx1}, + height: 2, + status: choices.Processing, + bytes: []byte{2}, + } + + randomVtxID := GenerateID() + + te := &Transitive{} + te.Initialize(config) + te.finishBootstrapping() + + parsed := new(bool) + st.parseVertex = func(b []byte) (avalanche.Vertex, error) { + switch { + case bytes.Equal(b, vtx1.Bytes()): + *parsed = true + return vtx1, nil + } + return nil, errUnknownVertex + } + + st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + if !*parsed { + return nil, errUnknownVertex + } + + switch { + case vtxID.Equals(vtx1.ID()): + return vtx1, nil + } + return nil, errUnknownVertex + } + + reqID := new(uint32) + sender.GetF = func(reqVdr ids.ShortID, requestID uint32, vtxID ids.ID) { + *reqID = requestID + if !reqVdr.Equals(vdr.ID()) { + t.Fatalf("Wrong validator requested") + } + if !vtxID.Equals(vtx0.ID()) { + t.Fatalf("Wrong vertex requested") + } + } + + te.PushQuery(vdr.ID(), 0, vtx1.ID(), vtx1.Bytes()) + + sender.GetF = nil + sender.CantGet = false + + te.PushQuery(vdr.ID(), *reqID, randomVtxID, []byte{3}) + + *parsed = false + st.parseVertex = func(b []byte) (avalanche.Vertex, error) { + switch { + case bytes.Equal(b, vtx0.Bytes()): + *parsed = true + return vtx0, nil + } + return nil, errUnknownVertex + } + + st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + if !*parsed { + return nil, errUnknownVertex + } + + switch { + case vtxID.Equals(vtx0.ID()): + return vtx0, nil + } + return nil, errUnknownVertex + } + sender.CantPushQuery = false + sender.CantChits = false + + vtx0.status = choices.Processing + + te.Put(vdr.ID(), *reqID, vtx0.ID(), vtx0.Bytes()) + + prefs := te.Consensus.Preferences() + if !prefs.Contains(vtx1.ID()) { + t.Fatalf("Shouldn't have abandoned the pending vertex") + } +} + +func TestEngineAggressivePolling(t *testing.T) { + config := DefaultConfig() + + config.Params.ConcurrentRepolls = 3 + + vdr := validators.GenerateRandomValidator(1) + + vals := validators.NewSet() + config.Validators = vals + + vals.Add(vdr) + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + st := &stateTest{t: t} + config.State = st + + gVtx := &Vtx{ + id: GenerateID(), + status: choices.Accepted, + bytes: []byte{0}, + } + + vts := []avalanche.Vertex{gVtx} + utxos := []ids.ID{GenerateID(), GenerateID()} + + tx0 := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx0.Ins.Add(utxos[0]) + + tx1 := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx1.Ins.Add(utxos[1]) + + vtx := &Vtx{ + parents: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx0}, + height: 1, + status: choices.Processing, + bytes: []byte{1}, + } + + te := &Transitive{} + te.Initialize(config) + te.finishBootstrapping() + + parsed := new(bool) + st.parseVertex = func(b []byte) (avalanche.Vertex, error) { + switch { + case bytes.Equal(b, vtx.Bytes()): + *parsed = true + return vtx, nil + } + return nil, errUnknownVertex + } + + st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + if !*parsed { + return nil, errUnknownVertex + } + + switch { + case vtxID.Equals(vtx.ID()): + return vtx, nil + } + return nil, errUnknownVertex + } + + numPushQueries := new(int) + sender.PushQueryF = func(ids.ShortSet, uint32, ids.ID, []byte) { *numPushQueries++ } + + numPullQueries := new(int) + sender.PullQueryF = func(ids.ShortSet, uint32, ids.ID) { *numPullQueries++ } + + te.Put(vdr.ID(), 0, vtx.ID(), vtx.Bytes()) + + if *numPushQueries != 1 { + t.Fatalf("should have issued one push query") + } + if *numPullQueries != 2 { + t.Fatalf("should have issued one pull query") + } +} diff --git a/snow/engine/avalanche/voter.go b/snow/engine/avalanche/voter.go index 6e52a34..b14ef40 100644 --- a/snow/engine/avalanche/voter.go +++ b/snow/engine/avalanche/voter.go @@ -59,10 +59,7 @@ func (v *voter) Update() { } v.t.Config.Context.Log.Verbo("Avalanche engine can't quiesce") - - if len(v.t.polls.m) < v.t.Config.Params.ConcurrentRepolls { - v.t.repoll() - } + v.t.repoll() } func (v *voter) bubbleVotes(votes ids.UniqueBag) ids.UniqueBag { diff --git a/snow/engine/common/engine.go b/snow/engine/common/engine.go index 06761de..d0d41c0 100644 --- a/snow/engine/common/engine.go +++ b/snow/engine/common/engine.go @@ -34,75 +34,181 @@ type ExternalHandler interface { // FrontierHandler defines how a consensus engine reacts to frontier messages // from other validators type FrontierHandler interface { - // GetAcceptedFrontier notifies this consensus engine that its accepted - // frontier is requested by the specified validator + // Notify this engine of a request for the accepted frontier of vertices. + // + // The accepted frontier is the set of accepted vertices that do not have + // any accepted descendants. + // + // This function can be called by any validator. It is not safe to assume + // this message is utilizing a unique requestID. However, the validatorID is + // assumed to be authenticated. + // + // This engine should respond with an AcceptedFrontier message with the same + // requestID, and the engine's current accepted frontier. GetAcceptedFrontier(validatorID ids.ShortID, requestID uint32) - // AcceptedFrontier notifies this consensus engine of the specified - // validators current accepted frontier - AcceptedFrontier(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) + // Notify this engine of an accepted frontier. + // + // This function can be called by any validator. It is not safe to assume + // this message is in response to a GetAcceptedFrontier message, is utilizing a + // unique requestID, or that the containerIDs from a valid frontier. + // However, the validatorID is assumed to be authenticated. + AcceptedFrontier( + validatorID ids.ShortID, + requestID uint32, + containerIDs ids.Set, + ) - // GetAcceptedFrontierFailed notifies this consensus engine that the - // requested accepted frontier from the specified validator should be - // considered lost + // Notify this engine that a get accepted frontier request it issued has + // failed. + // + // This function will be called if the engine sent a GetAcceptedFrontier + // message that is not anticipated to be responded to. This could be because + // the recipient of the message is unknown or if the message request has + // timed out. + // + // The validatorID, and requestID, are assumed to be the same as those sent + // in the GetAcceptedFrontier message. GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) } // AcceptedHandler defines how a consensus engine reacts to messages pertaining // to accepted containers from other validators type AcceptedHandler interface { - // GetAccepted notifies this consensus engine that it should send the set of - // containerIDs that it has accepted from the provided set to the specified - // validator + // Notify this engine of a request to filter non-accepted vertices. + // + // This function can be called by any validator. It is not safe to assume + // this message is utilizing a unique requestID. However, the validatorID is + // assumed to be authenticated. + // + // This engine should respond with an Accepted message with the same + // requestID, and the subset of the containerIDs that this node has decided + // are accepted. GetAccepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) - // Accepted notifies this consensus engine of a set of accepted containerIDs + // Notify this engine of a set of accepted vertices. + // + // This function can be called by any validator. It is not safe to assume + // this message is in response to a GetAccepted message, is utilizing a + // unique requestID, or that the containerIDs are a subset of the + // containerIDs from a GetAccepted message. However, the validatorID is + // assumed to be authenticated. Accepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) - // GetAcceptedFailed notifies this consensus engine that the requested - // accepted containers requested from the specified validator should be - // considered lost + // Notify this engine that a get accepted request it issued has failed. + // + // This function will be called if the engine sent a GetAccepted message + // that is not anticipated to be responded to. This could be because the + // recipient of the message is unknown or if the message request has timed + // out. + // + // The validatorID, and requestID, are assumed to be the same as those sent + // in the GetAccepted message. GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) } // FetchHandler defines how a consensus engine reacts to retrieval messages from // other validators type FetchHandler interface { - // Get notifies this consensus engine that the specified validator requested - // that this engine send the specified container to it + // Notify this engine of a request for a container. + // + // This function can be called by any validator. It is not safe to assume + // this message is utilizing a unique requestID. It is also not safe to + // assume the requested containerID exists. However, the validatorID is + // assumed to be authenticated. + // + // There should never be a situation where a virtuous node sends a Get + // request to another virtuous node that does not have the requested + // container. Unless that container was pruned from the active set. + // + // This engine should respond with a Put message with the same requestID if + // the container was locally avaliable. Otherwise, the message can be safely + // dropped. Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) - // Put the container with the specified ID and body. + // Notify this engine of a container. + // + // This function can be called by any validator. It is not safe to assume + // this message is utilizing a unique requestID or even that the containerID + // matches the ID of the container bytes. However, the validatorID is + // assumed to be authenticated. + // // This engine needs to request and receive missing ancestors of the // container before adding the container to consensus. Once all ancestor // containers are added, pushes the container into the consensus. - Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) + Put( + validatorID ids.ShortID, + requestID uint32, + containerID ids.ID, + container []byte, + ) // Notify this engine that a get request it issued has failed. - GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID) + // + // This function will be called if the engine sent a Get message that is not + // anticipated to be responded to. This could be because the recipient of + // the message is unknown or if the message request has timed out. + // + // The validatorID and requestID are assumed to be the same as those sent in + // the Get message. + GetFailed(validatorID ids.ShortID, requestID uint32) } // QueryHandler defines how a consensus engine reacts to query messages from // other validators type QueryHandler interface { - // Notify this engine that the specified validator queried it about the - // specified container. That is, the validator would like to know whether - // this engine prefers the specified container. If the ancestry of the - // container is incomplete, or the container is unknown, request the missing - // data. Once complete, sends this validator the current preferences. + // Notify this engine of a request for our preferences. + // + // This function can be called by any validator. It is not safe to assume + // this message is utilizing a unique requestID. However, the validatorID is + // assumed to be authenticated. + // + // If the container or its ancestry is incomplete, this engine is expected + // to request the missing containers from the validator. Once the ancestry + // is complete, this engine should send this validator the current + // preferences in a Chits message. The Chits message should have the same + // requestID that was passed in here. PullQuery(validatorID ids.ShortID, requestID uint32, containerID ids.ID) - // Notify this engine that the specified validator queried it about the - // specified container. That is, the validator would like to know whether - // this engine prefers the specified container. If the ancestry of the - // container is incomplete, request it. Once complete, sends this validator - // the current preferences. - PushQuery(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) + // Notify this engine of a request for our preferences. + // + // This function can be called by any validator. It is not safe to assume + // this message is utilizing a unique requestID or even that the containerID + // matches the ID of the container bytes. However, the validatorID is + // assumed to be authenticated. + // + // This function is meant to behave the same way as PullQuery, except the + // container is optimistically provided to potentially remove the need for + // a series of Get/Put messages. + // + // If the ancestry of the container is incomplete, this engine is expected + // to request the ancestry from the validator. Once the ancestry is + // complete, this engine should send this validator the current preferences + // in a Chits message. The Chits message should have the same requestID that + // was passed in here. + PushQuery( + validatorID ids.ShortID, + requestID uint32, + containerID ids.ID, + container []byte, + ) // Notify this engine of the specified validators preferences. + // + // This function can be called by any validator. It is not safe to assume + // this message is in response to a PullQuery or a PushQuery message. + // However, the validatorID is assumed to be authenticated. Chits(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) // Notify this engine that a query it issued has failed. + // + // This function will be called if the engine sent a PullQuery or PushQuery + // message that is not anticipated to be responded to. This could be because + // the recipient of the message is unknown or if the message request has + // timed out. + // + // The validatorID and the requestID are assumed to be the same as those + // sent in the Query message. QueryFailed(validatorID ids.ShortID, requestID uint32) } @@ -110,14 +216,19 @@ type QueryHandler interface { // other components of this validator type InternalHandler interface { // Startup this engine. + // + // This function will be called once the environment is configured to be + // able to run the engine. Startup() // Gossip to the network a container on the accepted frontier Gossip() // Shutdown this engine. + // + // This function will be called when the environment is exiting. Shutdown() - // Notify this engine that the vm has sent a message to it. + // Notify this engine of a message from the virtual machine. Notify(Message) } diff --git a/snow/engine/common/requests.go b/snow/engine/common/requests.go new file mode 100644 index 0000000..22d5759 --- /dev/null +++ b/snow/engine/common/requests.go @@ -0,0 +1,88 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package common + +import ( + "github.com/ava-labs/gecko/ids" +) + +type req struct { + vdr ids.ShortID + id uint32 +} + +// Requests tracks pending container messages from a peer. +type Requests struct { + reqsToID map[[20]byte]map[uint32]ids.ID + idToReq map[[32]byte]req +} + +// Add a request. Assumes that requestIDs are unique. Assumes that containerIDs +// are only in one request at a time. +func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) { + if r.reqsToID == nil { + r.reqsToID = make(map[[20]byte]map[uint32]ids.ID) + } + vdrKey := vdr.Key() + vdrReqs, ok := r.reqsToID[vdrKey] + if !ok { + vdrReqs = make(map[uint32]ids.ID) + r.reqsToID[vdrKey] = vdrReqs + } + vdrReqs[requestID] = containerID + + if r.idToReq == nil { + r.idToReq = make(map[[32]byte]req) + } + r.idToReq[containerID.Key()] = req{ + vdr: vdr, + id: requestID, + } +} + +// Remove attempts to abandon a requestID sent to a validator. If the request is +// currently outstanding, the requested ID will be returned along with true. If +// the request isn't currently outstanding, false will be returned. +func (r *Requests) Remove(vdr ids.ShortID, requestID uint32) (ids.ID, bool) { + vdrKey := vdr.Key() + vdrReqs, ok := r.reqsToID[vdrKey] + if !ok { + return ids.ID{}, false + } + containerID, ok := vdrReqs[requestID] + if !ok { + return ids.ID{}, false + } + + if len(vdrReqs) == 1 { + delete(r.reqsToID, vdrKey) + } else { + delete(vdrReqs, requestID) + } + + delete(r.idToReq, containerID.Key()) + return containerID, true +} + +// RemoveAny outstanding requests for the container ID. True is returned if the +// container ID had an outstanding request. +func (r *Requests) RemoveAny(containerID ids.ID) bool { + req, ok := r.idToReq[containerID.Key()] + if !ok { + return false + } + + r.Remove(req.vdr, req.id) + return true +} + +// Len returns the total number of outstanding requests. +func (r *Requests) Len() int { return len(r.idToReq) } + +// Contains returns true if there is an outstanding request for the container +// ID. +func (r *Requests) Contains(containerID ids.ID) bool { + _, ok := r.idToReq[containerID.Key()] + return ok +} diff --git a/snow/engine/common/requests_test.go b/snow/engine/common/requests_test.go new file mode 100644 index 0000000..334edf5 --- /dev/null +++ b/snow/engine/common/requests_test.go @@ -0,0 +1,90 @@ +// (c) 2019-2020, Ava Labs, Inc. All rights reserved. +// See the file LICENSE for licensing terms. + +package common + +import ( + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/ava-labs/gecko/ids" +) + +func TestRequests(t *testing.T) { + req := Requests{} + + length := req.Len() + assert.Equal(t, 0, length, "should have had no outstanding requests") + + _, removed := req.Remove(ids.ShortEmpty, 0) + assert.False(t, removed, "shouldn't have removed the request") + + removed = req.RemoveAny(ids.Empty) + assert.False(t, removed, "shouldn't have removed the request") + + constains := req.Contains(ids.Empty) + assert.False(t, constains, "shouldn't contain this request") + + req.Add(ids.ShortEmpty, 0, ids.Empty) + + length = req.Len() + assert.Equal(t, 1, length, "should have had one outstanding request") + + _, removed = req.Remove(ids.ShortEmpty, 1) + assert.False(t, removed, "shouldn't have removed the request") + + _, removed = req.Remove(ids.NewShortID([20]byte{1}), 0) + assert.False(t, removed, "shouldn't have removed the request") + + constains = req.Contains(ids.Empty) + assert.True(t, constains, "should contain this request") + + length = req.Len() + assert.Equal(t, 1, length, "should have had one outstanding request") + + req.Add(ids.ShortEmpty, 10, ids.Empty.Prefix(0)) + + length = req.Len() + assert.Equal(t, 2, length, "should have had two outstanding requests") + + _, removed = req.Remove(ids.ShortEmpty, 1) + assert.False(t, removed, "shouldn't have removed the request") + + _, removed = req.Remove(ids.NewShortID([20]byte{1}), 0) + assert.False(t, removed, "shouldn't have removed the request") + + constains = req.Contains(ids.Empty) + assert.True(t, constains, "should contain this request") + + length = req.Len() + assert.Equal(t, 2, length, "should have had two outstanding requests") + + removedID, removed := req.Remove(ids.ShortEmpty, 0) + assert.True(t, removedID.Equals(ids.Empty), "should have removed the requested ID") + assert.True(t, removed, "should have removed the request") + + removedID, removed = req.Remove(ids.ShortEmpty, 10) + assert.True(t, removedID.Equals(ids.Empty.Prefix(0)), "should have removed the requested ID") + assert.True(t, removed, "should have removed the request") + + length = req.Len() + assert.Equal(t, 0, length, "should have had no outstanding requests") + + req.Add(ids.ShortEmpty, 0, ids.Empty) + + length = req.Len() + assert.Equal(t, 1, length, "should have had one outstanding request") + + removed = req.RemoveAny(ids.Empty) + assert.True(t, removed, "should have removed the request") + + length = req.Len() + assert.Equal(t, 0, length, "should have had no outstanding requests") + + removed = req.RemoveAny(ids.Empty) + assert.False(t, removed, "shouldn't have removed the request") + + length = req.Len() + assert.Equal(t, 0, length, "should have had no outstanding requests") +} diff --git a/snow/engine/common/test_engine.go b/snow/engine/common/test_engine.go index 27a07f5..1bbd060 100644 --- a/snow/engine/common/test_engine.go +++ b/snow/engine/common/test_engine.go @@ -42,7 +42,8 @@ type EngineTest struct { StartupF, GossipF, ShutdownF func() ContextF func() *snow.Context NotifyF func(Message) - GetF, GetFailedF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID) + GetF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID) + GetFailedF func(validatorID ids.ShortID, requestID uint32) PutF, PushQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) GetAcceptedFrontierF, GetAcceptedFrontierFailedF, GetAcceptedFailedF, QueryFailedF func(validatorID ids.ShortID, requestID uint32) AcceptedFrontierF, GetAcceptedF, AcceptedF, ChitsF func(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) @@ -187,9 +188,9 @@ func (e *EngineTest) Get(validatorID ids.ShortID, requestID uint32, containerID } // GetFailed ... -func (e *EngineTest) GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID) { +func (e *EngineTest) GetFailed(validatorID ids.ShortID, requestID uint32) { if e.GetFailedF != nil { - e.GetFailedF(validatorID, requestID, containerID) + e.GetFailedF(validatorID, requestID) } else if e.CantGetFailed && e.T != nil { e.T.Fatalf("Unexpectedly called GetFailed") } diff --git a/snow/engine/snowman/bootstrapper.go b/snow/engine/snowman/bootstrapper.go index 92db2d3..7442eea 100644 --- a/snow/engine/snowman/bootstrapper.go +++ b/snow/engine/snowman/bootstrapper.go @@ -20,6 +20,9 @@ type BootstrapConfig struct { // Blocked tracks operations that are blocked on blocks Blocked *queue.Jobs + // blocks that have outstanding get requests + blkReqs common.Requests + VM ChainVM Bootstrapped func() @@ -84,16 +87,22 @@ func (b *bootstrapper) ForceAccepted(acceptedContainerIDs ids.Set) { func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) { b.BootstrapConfig.Context.Log.Verbo("Put called for blkID %s", blkID) - if !b.pending.Contains(blkID) { - return - } - blk, err := b.VM.ParseBlock(blkBytes) if err != nil { b.BootstrapConfig.Context.Log.Debug("ParseBlock failed due to %s for block:\n%s", err, formatting.DumpBytes{Bytes: blkBytes}) - b.GetFailed(vdr, requestID, blkID) + + b.GetFailed(vdr, requestID) + return + } + + if !b.pending.Contains(blk.ID()) { + b.BootstrapConfig.Context.Log.Debug("Validator %s sent an unrequested block:\n%s", + vdr, + formatting.DumpBytes{Bytes: blkBytes}) + + b.GetFailed(vdr, requestID) return } @@ -101,7 +110,15 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkB } // GetFailed ... -func (b *bootstrapper) GetFailed(_ ids.ShortID, _ uint32, blkID ids.ID) { b.sendRequest(blkID) } +func (b *bootstrapper) GetFailed(vdr ids.ShortID, requestID uint32) { + blkID, ok := b.blkReqs.Remove(vdr, requestID) + if !ok { + b.BootstrapConfig.Context.Log.Debug("GetFailed called without sending the corresponding Get message from %s", + vdr) + return + } + b.sendRequest(blkID) +} func (b *bootstrapper) fetch(blkID ids.ID) { if b.pending.Contains(blkID) { @@ -125,6 +142,9 @@ func (b *bootstrapper) sendRequest(blkID ids.ID) { validatorID := validators[0].ID() b.RequestID++ + b.blkReqs.RemoveAny(blkID) + b.blkReqs.Add(validatorID, b.RequestID, blkID) + b.pending.Add(blkID) b.BootstrapConfig.Sender.Get(validatorID, b.RequestID, blkID) diff --git a/snow/engine/snowman/bootstrapper_test.go b/snow/engine/snowman/bootstrapper_test.go index 19fa608..3d9752d 100644 --- a/snow/engine/snowman/bootstrapper_test.go +++ b/snow/engine/snowman/bootstrapper_test.go @@ -223,12 +223,13 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) { bs.ForceAccepted(acceptedIDs) vm.GetBlockF = nil - sender.GetF = nil vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) { switch { case bytes.Equal(blkBytes, blkBytes1): return blk1, nil + case bytes.Equal(blkBytes, blkBytes2): + return blk2, nil } t.Fatal(errUnknownBlock) return nil, errUnknownBlock @@ -477,3 +478,113 @@ func TestBootstrapperPartialFetch(t *testing.T) { t.Fatalf("wrong number pending") } } + +func TestBootstrapperWrongIDByzantineResponse(t *testing.T) { + config, peerID, sender, vm := newConfig(t) + + blkID0 := ids.Empty.Prefix(0) + blkID1 := ids.Empty.Prefix(1) + blkID2 := ids.Empty.Prefix(2) + + blkBytes0 := []byte{0} + blkBytes1 := []byte{1} + blkBytes2 := []byte{2} + + blk0 := &Blk{ + id: blkID0, + height: 0, + status: choices.Accepted, + bytes: blkBytes0, + } + blk1 := &Blk{ + parent: blk0, + id: blkID1, + height: 1, + status: choices.Processing, + bytes: blkBytes1, + } + blk2 := &Blk{ + parent: blk1, + id: blkID2, + height: 2, + status: choices.Processing, + bytes: blkBytes2, + } + + bs := bootstrapper{} + bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry()) + bs.Initialize(config) + + acceptedIDs := ids.Set{} + acceptedIDs.Add(blkID1) + + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + switch { + case blkID.Equals(blkID1): + return nil, errUnknownBlock + default: + t.Fatal(errUnknownBlock) + panic(errUnknownBlock) + } + } + + requestID := new(uint32) + sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) { + if !vdr.Equals(peerID) { + t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr) + } + switch { + case vtxID.Equals(blkID1): + default: + t.Fatalf("Requested unknown block") + } + + *requestID = reqID + } + + bs.ForceAccepted(acceptedIDs) + + vm.GetBlockF = nil + sender.GetF = nil + + vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) { + switch { + case bytes.Equal(blkBytes, blkBytes2): + return blk2, nil + } + t.Fatal(errUnknownBlock) + return nil, errUnknownBlock + } + + sender.CantGet = false + + bs.Put(peerID, *requestID, blkID1, blkBytes2) + + sender.CantGet = true + + vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) { + switch { + case bytes.Equal(blkBytes, blkBytes1): + return blk1, nil + } + t.Fatal(errUnknownBlock) + return nil, errUnknownBlock + } + + finished := new(bool) + bs.onFinished = func() { *finished = true } + + bs.Put(peerID, *requestID, blkID1, blkBytes1) + + vm.ParseBlockF = nil + + if !*finished { + t.Fatalf("Bootstrapping should have finished") + } + if blk1.Status() != choices.Accepted { + t.Fatalf("Block should be accepted") + } + if blk2.Status() != choices.Processing { + t.Fatalf("Block should be processing") + } +} diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index abcd0e6..c03533b 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -19,12 +19,22 @@ type Transitive struct { Config bootstrapper - polls polls // track people I have asked for their preference + // track outstanding preference requests + polls polls - blkReqs, pending ids.Set // prevent asking validators for the same block + // blocks that have outstanding get requests + blkReqs common.Requests - blocked events.Blocker // track operations that are blocked on blocks + // blocks that are fetched but haven't been issued due to missing + // dependencies + pending ids.Set + // operations that are blocked on a block being issued. This could be + // issuing another block, responding to a query, or applying votes to + // consensus + blocked events.Blocker + + // mark for if the engine has been bootstrapped or not bootstrapped bool } @@ -33,7 +43,11 @@ func (t *Transitive) Initialize(config Config) { config.Context.Log.Info("Initializing Snowman consensus") t.Config = config - t.metrics.Initialize(config.Context.Log, config.Params.Namespace, config.Params.Metrics) + t.metrics.Initialize( + config.Context.Log, + config.Params.Namespace, + config.Params.Metrics, + ) t.onFinished = t.finishBootstrapping t.bootstrapper.Initialize(config.BootstrapConfig) @@ -44,11 +58,19 @@ func (t *Transitive) Initialize(config Config) { t.polls.m = make(map[uint32]poll) } +// when bootstrapping is finished, this will be called. This initializes the +// consensus engine with the last accepted block. func (t *Transitive) finishBootstrapping() { + // set the bootstrapped mark to switch consensus modes t.bootstrapped = true + + // initialize consensus to the last accepted blockID tailID := t.Config.VM.LastAccepted() t.Consensus.Initialize(t.Config.Context, t.Params, tailID) + // to maintain the invariant that oracle blocks are issued in the correct + // preferences, we need to handle the case that we are bootstrapping into an + // oracle block tail, err := t.Config.VM.GetBlock(tailID) if err != nil { t.Config.Context.Log.Error("Failed to get last accepted block due to: %s", err) @@ -58,9 +80,12 @@ func (t *Transitive) finishBootstrapping() { switch blk := tail.(type) { case OracleBlock: for _, blk := range blk.Options() { + // note that deliver will set the VM's preference t.deliver(blk) } default: + // if there aren't blocks we need to deliver on startup, we need to set + // the preference to the last accepted block t.Config.VM.SetPreference(tailID) } @@ -91,15 +116,27 @@ func (t *Transitive) Context() *snow.Context { return t.Config.Context } // Get implements the Engine interface func (t *Transitive) Get(vdr ids.ShortID, requestID uint32, blkID ids.ID) { - if blk, err := t.Config.VM.GetBlock(blkID); err == nil { - t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes()) + blk, err := t.Config.VM.GetBlock(blkID) + if err != nil { + // If we failed to get the block, that means either an unexpected error + // has occurred, the validator is not following the protocol, or the + // block has been pruned. + t.Config.Context.Log.Warn("Get called for blockID %s errored with %s", + blkID, + err) + return } + + // Respond to the validator with the fetched block and the same requestID. + t.Config.Sender.Put(vdr, requestID, blkID, blk.Bytes()) } // Put implements the Engine interface func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) { t.Config.Context.Log.Verbo("Put called for blockID %s", blkID) + // if the engine hasn't been bootstrapped, forward the request to the + // bootstrapper if !t.bootstrapped { t.bootstrapper.Put(vdr, requestID, blkID, blkBytes) return @@ -110,32 +147,53 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkByt t.Config.Context.Log.Debug("ParseBlock failed due to %s for block:\n%s", err, formatting.DumpBytes{Bytes: blkBytes}) - t.GetFailed(vdr, requestID, blkID) + + // because GetFailed doesn't utilize the assumption that we actually + // sent a Get message, we can safely call GetFailed here to potentially + // abandon the request. + t.GetFailed(vdr, requestID) return } + // insert the block into consensus. If the block has already been issued, + // this will be a noop. If this block has missing dependencies, vdr will + // receive requests to fill the ancestry. dependencies that have already + // been fetched, but with missing dependencies themselves won't be requested + // from the vdr. t.insertFrom(vdr, blk) } // GetFailed implements the Engine interface -func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32, blkID ids.ID) { +func (t *Transitive) GetFailed(vdr ids.ShortID, requestID uint32) { + // if the engine hasn't been bootstrapped, forward the request to the + // bootstrapper if !t.bootstrapped { - t.bootstrapper.GetFailed(vdr, requestID, blkID) + t.bootstrapper.GetFailed(vdr, requestID) return } - t.pending.Remove(blkID) - t.blocked.Abandon(blkID) - t.blkReqs.Remove(blkID) + // we don't use the assumption that this function is called after a failed + // Get message. So we first check to see if we have an outsanding request + // and also get what the request was for if it exists + blkID, ok := t.blkReqs.Remove(vdr, requestID) + if !ok { + t.Config.Context.Log.Warn("GetFailed called without sending the corresponding Get message from %s", + vdr) + return + } - // Tracks performance statistics - t.numBlockedBlk.Set(float64(t.pending.Len())) + // because the get request was dropped, we no longer are expected blkID to + // be issued. + t.blocked.Abandon(blkID) } // PullQuery implements the Engine interface func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) { + // if the engine hasn't been bootstrapped, we aren't ready to respond to + // queries if !t.bootstrapped { - t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping", blkID) + t.Config.Context.Log.Debug("Dropping PullQuery for %s due to bootstrapping", + blkID) return } @@ -146,6 +204,8 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) requestID: requestID, } + // if we aren't able to have issued this block, then it is a dependency for + // this reply if !t.reinsertFrom(vdr, blkID) { c.deps.Add(blkID) } @@ -154,18 +214,37 @@ func (t *Transitive) PullQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID) } // PushQuery implements the Engine interface -func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blk []byte) { +func (t *Transitive) PushQuery(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) { + // if the engine hasn't been bootstrapped, we aren't ready to respond to + // queries if !t.bootstrapped { t.Config.Context.Log.Debug("Dropping PushQuery for %s due to bootstrapping", blkID) return } - t.Put(vdr, requestID, blkID, blk) - t.PullQuery(vdr, requestID, blkID) + blk, err := t.Config.VM.ParseBlock(blkBytes) + // If the parsing fails, we just drop the request, as we didn't ask for it + if err != nil { + t.Config.Context.Log.Warn("ParseBlock failed due to %s for block:\n%s", + err, + formatting.DumpBytes{Bytes: blkBytes}) + return + } + + // insert the block into consensus. If the block has already been issued, + // this will be a noop. If this block has missing dependencies, vdr will + // receive requests to fill the ancestry. dependencies that have already + // been fetched, but with missing dependencies themselves won't be requested + // from the vdr. + t.insertFrom(vdr, blk) + + // register the chit request + t.PullQuery(vdr, requestID, blk.ID()) } // Chits implements the Engine interface func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) { + // if the engine hasn't been bootstrapped, we shouldn't be receiving chits if !t.bootstrapped { t.Config.Context.Log.Debug("Dropping Chits due to bootstrapping") return @@ -173,7 +252,14 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) { // Since this is snowman, there should only be one ID in the vote set if votes.Len() != 1 { - t.Config.Context.Log.Debug("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d", votes.Len(), vdr, requestID) + t.Config.Context.Log.Debug("Chits was called with the wrong number of votes %d. ValidatorID: %s, RequestID: %d", + votes.Len(), + vdr, + requestID) + + // because QueryFailed doesn't utilize the assumption that we actually + // sent a Query message, we can safely call QueryFailed here to + // potentially abandon the request. t.QueryFailed(vdr, requestID) return } @@ -188,6 +274,8 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) { response: vote, } + // if we aren't able to have issued the vote's block, then it is a + // dependency for applying the vote if !t.reinsertFrom(vdr, vote) { v.deps.Add(vote) } @@ -197,6 +285,7 @@ func (t *Transitive) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) { // QueryFailed implements the Engine interface func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) { + // if the engine hasn't been bootstrapped, we won't have sent a query if !t.bootstrapped { t.Config.Context.Log.Warn("Dropping QueryFailed due to bootstrapping") return @@ -211,6 +300,7 @@ func (t *Transitive) QueryFailed(vdr ids.ShortID, requestID uint32) { // Notify implements the Engine interface func (t *Transitive) Notify(msg common.Message) { + // if the engine hasn't been bootstrapped, we shouldn't issuing blocks if !t.bootstrapped { t.Config.Context.Log.Warn("Dropping Notify due to bootstrapping") return @@ -219,21 +309,32 @@ func (t *Transitive) Notify(msg common.Message) { t.Config.Context.Log.Verbo("Snowman engine notified of %s from the vm", msg) switch msg { case common.PendingTxs: - if blk, err := t.Config.VM.BuildBlock(); err == nil { - if status := blk.Status(); status != choices.Processing { - t.Config.Context.Log.Warn("Attempting to issue a block with status: %s, expected Processing", status) - } - parentID := blk.Parent().ID() - if pref := t.Consensus.Preference(); !parentID.Equals(pref) { - t.Config.Context.Log.Warn("Built block with parent: %s, expected %s", parentID, pref) - } - if t.insertAll(blk) { - t.Config.Context.Log.Verbo("Successfully issued new block from the VM") - } else { - t.Config.Context.Log.Warn("VM.BuildBlock returned a block that is pending for ancestors") - } - } else { + // the pending txs message means we should attempt to build a block. + blk, err := t.Config.VM.BuildBlock() + if err != nil { t.Config.Context.Log.Verbo("VM.BuildBlock errored with %s", err) + return + } + + // a newly created block is expected to be processing. If this check + // fails, there is potentially an error in the VM this engine is running + if status := blk.Status(); status != choices.Processing { + t.Config.Context.Log.Warn("Attempting to issue a block with status: %s, expected Processing", status) + } + + // the newly created block should be built on top of the preferred + // block. Otherwise, the new block doesn't have the best chance of being + // confirmed. + parentID := blk.Parent().ID() + if pref := t.Consensus.Preference(); !parentID.Equals(pref) { + t.Config.Context.Log.Warn("Built block with parent: %s, expected %s", parentID, pref) + } + + // inserting the block shouldn't have any missing dependencies + if t.insertAll(blk) { + t.Config.Context.Log.Verbo("Successfully issued new block from the VM") + } else { + t.Config.Context.Log.Warn("VM.BuildBlock returned a block that is pending for ancestors") } default: t.Config.Context.Log.Warn("Unexpected message from the VM: %s", msg) @@ -241,10 +342,20 @@ func (t *Transitive) Notify(msg common.Message) { } func (t *Transitive) repoll() { + // if we are issuing a repoll, we should gossip our current preferences to + // propagate the most likely branch as quickly as possible prefID := t.Consensus.Preference() - t.pullSample(prefID) + + for i := len(t.polls.m); i < t.Params.ConcurrentRepolls; i++ { + t.pullSample(prefID) + } } +// reinsertFrom attempts to issue the branch ending with a block, from only its +// ID, to consensus. Returns true if the block was added, or was previously +// added, to consensus. This is useful to check the local DB before requesting a +// block in case we have the block for some reason. If the block or a dependency +// is missing, the validator will be sent a Get message. func (t *Transitive) reinsertFrom(vdr ids.ShortID, blkID ids.ID) bool { blk, err := t.Config.VM.GetBlock(blkID) if err != nil { @@ -254,44 +365,81 @@ func (t *Transitive) reinsertFrom(vdr ids.ShortID, blkID ids.ID) bool { return t.insertFrom(vdr, blk) } +// insertFrom attempts to issue the branch ending with a block to consensus. +// Returns true if the block was added, or was previously added, to consensus. +// This is useful to check the local DB before requesting a block in case we +// have the block for some reason. If a dependency is missing, the validator +// will be sent a Get message. func (t *Transitive) insertFrom(vdr ids.ShortID, blk snowman.Block) bool { blkID := blk.ID() + // if the block has been issued, we don't need to insert it. if the block is + // already pending, we shouldn't attempt to insert it again yet for !t.Consensus.Issued(blk) && !t.pending.Contains(blkID) { t.insert(blk) - parent := blk.Parent() - parentID := parent.ID() - if parentStatus := parent.Status(); !parentStatus.Fetched() { - t.sendRequest(vdr, parentID) + blk = blk.Parent() + blkID = blk.ID() + + // if the parent hasn't been fetched, we need to request it to issue the + // newly inserted block + if !blk.Status().Fetched() { + t.sendRequest(vdr, blkID) return false } - - blk = parent - blkID = parentID } - return !t.pending.Contains(blkID) + return t.Consensus.Issued(blk) } +// insertAll attempts to issue the branch ending with a block to consensus. +// Returns true if the block was added, or was previously added, to consensus. +// This is useful to check the local DB before requesting a block in case we +// have the block for some reason. If a dependency is missing and the dependency +// hasn't been requested, the issuance will be abandoned. func (t *Transitive) insertAll(blk snowman.Block) bool { blkID := blk.ID() for blk.Status().Fetched() && !t.Consensus.Issued(blk) && !t.pending.Contains(blkID) { t.insert(blk) + blk = blk.Parent() + blkID = blk.ID() } - return !t.pending.Contains(blkID) + + // if issuance the block was successful, this is the happy path + if t.Consensus.Issued(blk) { + return true + } + + // if this branch is waiting on a block that we supposedly have a source of, + // we can just wait for that request to succeed or fail + if t.blkReqs.Contains(blkID) { + return false + } + + // if we have no reason to expect that this block will be inserted, we + // should abandon the block to avoid a memory leak + t.blocked.Abandon(blkID) + return false } +// attempt to insert the block to consensus. If the block's parent hasn't been +// issued, the insertion will block until the parent's issuance is abandoned or +// fulfilled func (t *Transitive) insert(blk snowman.Block) { blkID := blk.ID() + // mark that the block has been fetched but is pending t.pending.Add(blkID) - t.blkReqs.Remove(blkID) + + // if we have any outstanding requests for this block, remove the pending + // requests + t.blkReqs.RemoveAny(blkID) i := &issuer{ t: t, blk: blk, } + // block on the parent if needed if parent := blk.Parent(); !t.Consensus.Issued(parent) { parentID := parent.ID() t.Config.Context.Log.Verbo("Block waiting for parent %s", parentID) @@ -306,17 +454,22 @@ func (t *Transitive) insert(blk snowman.Block) { } func (t *Transitive) sendRequest(vdr ids.ShortID, blkID ids.ID) { - if !t.blkReqs.Contains(blkID) { - t.blkReqs.Add(blkID) - - t.numBlkRequests.Set(float64(t.blkReqs.Len())) // Tracks performance statistics - - t.RequestID++ - t.Config.Context.Log.Verbo("Sending Get message for %s", blkID) - t.Config.Sender.Get(vdr, t.RequestID, blkID) + // only send one request at a time for a block + if t.blkReqs.Contains(blkID) { + return } + + t.Config.Context.Log.Verbo("Sending Get message for %s", blkID) + + t.RequestID++ + t.blkReqs.Add(vdr, t.RequestID, blkID) + t.Config.Sender.Get(vdr, t.RequestID, blkID) + + // Tracks performance statistics + t.numBlkRequests.Set(float64(t.blkReqs.Len())) } +// send a pull request for this block ID func (t *Transitive) pullSample(blkID ids.ID) { t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators) p := t.Consensus.Parameters() @@ -326,15 +479,22 @@ func (t *Transitive) pullSample(blkID ids.ID) { vdrSet.Add(vdr.ID()) } - t.RequestID++ - if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) { - t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID) - } else if numVdrs < p.K { + if numVdrs := len(vdrs); numVdrs != p.K { t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID) + return } + + t.RequestID++ + if !t.polls.Add(t.RequestID, vdrSet.Len()) { + t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID) + return + } + + t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID) } -func (t *Transitive) pushSample(blk snowman.Block) bool { +// send a push request for this block +func (t *Transitive) pushSample(blk snowman.Block) { t.Config.Context.Log.Verbo("About to sample from: %s", t.Config.Validators) p := t.Consensus.Parameters() vdrs := t.Config.Validators.Sample(p.K) @@ -343,15 +503,20 @@ func (t *Transitive) pushSample(blk snowman.Block) bool { vdrSet.Add(vdr.ID()) } - t.RequestID++ - queryIssued := false - if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) { - t.Config.Sender.PushQuery(vdrSet, t.RequestID, blk.ID(), blk.Bytes()) - queryIssued = true - } else if numVdrs < p.K { - t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blk.ID()) + blkID := blk.ID() + if numVdrs := len(vdrs); numVdrs != p.K { + t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID) + return } - return queryIssued + + t.RequestID++ + if !t.polls.Add(t.RequestID, vdrSet.Len()) { + t.Config.Context.Log.Error("Query for %s was dropped due to use of a duplicated requestID", blkID) + return + } + + t.Config.Sender.PushQuery(vdrSet, t.RequestID, blkID, blk.Bytes()) + return } func (t *Transitive) deliver(blk snowman.Block) { @@ -359,11 +524,14 @@ func (t *Transitive) deliver(blk snowman.Block) { return } + // we are adding the block to consensus, so it is no longer pending blkID := blk.ID() t.pending.Remove(blkID) if err := blk.Verify(); err != nil { t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err) + + // if verify fails, then all decedents are also invalid t.blocked.Abandon(blkID) t.numBlockedBlk.Set(float64(t.pending.Len())) // Tracks performance statistics return @@ -371,8 +539,10 @@ func (t *Transitive) deliver(blk snowman.Block) { t.Config.Context.Log.Verbo("Adding block to consensus: %s", blkID) t.Consensus.Add(blk) - polled := t.pushSample(blk) + // Add all the oracle blocks if they exist. We call verify on all the blocks + // and add them to consensus before marking anything as fulfilled to avoid + // any potential reentrant bugs. added := []snowman.Block{} dropped := []snowman.Block{} switch blk := blk.(type) { @@ -380,20 +550,23 @@ func (t *Transitive) deliver(blk snowman.Block) { for _, blk := range blk.Options() { if err := blk.Verify(); err != nil { t.Config.Context.Log.Debug("Block failed verification due to %s, dropping block", err) - t.blocked.Abandon(blk.ID()) dropped = append(dropped, blk) } else { t.Consensus.Add(blk) - t.pushSample(blk) added = append(added, blk) } } } t.Config.VM.SetPreference(t.Consensus.Preference()) - t.blocked.Fulfill(blkID) + // launch a query for the newly added block + t.pushSample(blk) + + t.blocked.Fulfill(blkID) for _, blk := range added { + t.pushSample(blk) + blkID := blk.ID() t.pending.Remove(blkID) t.blocked.Fulfill(blkID) @@ -404,9 +577,8 @@ func (t *Transitive) deliver(blk snowman.Block) { t.blocked.Abandon(blkID) } - if polled && len(t.polls.m) < t.Params.ConcurrentRepolls { - t.repoll() - } + // If we should issue multiple queries at the same time, we need to repoll + t.repoll() // Tracks performance statistics t.numBlkRequests.Set(float64(t.blkReqs.Len())) diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 047d689..b68d08e 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -102,7 +102,9 @@ func TestEngineAdd(t *testing.T) { } asked := new(bool) - sender.GetF = func(inVdr ids.ShortID, _ uint32, blkID ids.ID) { + reqID := new(uint32) + sender.GetF = func(inVdr ids.ShortID, requestID uint32, blkID ids.ID) { + *reqID = requestID if *asked { t.Fatalf("Asked multiple times") } @@ -136,7 +138,7 @@ func TestEngineAdd(t *testing.T) { vm.ParseBlockF = func(b []byte) (snowman.Block, error) { return nil, errParseBlock } - te.Put(vdr.ID(), 0, blk.Parent().ID(), nil) + te.Put(vdr.ID(), *reqID, blk.Parent().ID(), nil) vm.ParseBlockF = nil @@ -906,7 +908,11 @@ func TestEngineAbandonQuery(t *testing.T) { panic("Should have failed") } } - sender.CantGet = false + + reqID := new(uint32) + sender.GetF = func(_ ids.ShortID, requestID uint32, _ ids.ID) { + *reqID = requestID + } te.PullQuery(vdr.ID(), 0, blkID) @@ -914,7 +920,7 @@ func TestEngineAbandonQuery(t *testing.T) { t.Fatalf("Should have blocked on request") } - te.GetFailed(vdr.ID(), 0, blkID) + te.GetFailed(vdr.ID(), *reqID) if len(te.blocked) != 0 { t.Fatalf("Should have removed request") @@ -947,7 +953,12 @@ func TestEngineAbandonChit(t *testing.T) { panic("Should have failed") } } - sender.CantGet = false + + reqID := new(uint32) + sender.GetF = func(_ ids.ShortID, requestID uint32, _ ids.ID) { + *reqID = requestID + } + fakeBlkIDSet := ids.Set{} fakeBlkIDSet.Add(fakeBlkID) te.Chits(vdr.ID(), 0, fakeBlkIDSet) @@ -956,7 +967,7 @@ func TestEngineAbandonChit(t *testing.T) { t.Fatalf("Should have blocked on request") } - te.GetFailed(vdr.ID(), 0, fakeBlkID) + te.GetFailed(vdr.ID(), *reqID) if len(te.blocked) != 0 { t.Fatalf("Should have removed request") @@ -1105,14 +1116,18 @@ func TestEngineRetryFetch(t *testing.T) { } vm.CantGetBlock = false - sender.CantGet = false + + reqID := new(uint32) + sender.GetF = func(_ ids.ShortID, requestID uint32, _ ids.ID) { + *reqID = requestID + } te.PullQuery(vdr.ID(), 0, missingBlk.ID()) vm.CantGetBlock = true - sender.CantGet = true + sender.GetF = nil - te.GetFailed(vdr.ID(), 0, missingBlk.ID()) + te.GetFailed(vdr.ID(), *reqID) vm.CantGetBlock = false @@ -1124,7 +1139,7 @@ func TestEngineRetryFetch(t *testing.T) { te.PullQuery(vdr.ID(), 0, missingBlk.ID()) vm.CantGetBlock = true - sender.CantGet = true + sender.GetF = nil if !*called { t.Fatalf("Should have requested the block again") @@ -1220,3 +1235,290 @@ func TestEngineGossip(t *testing.T) { t.Fatalf("Should have gossiped the block") } } + +func TestEngineInvalidBlockIgnoredFromUnexpectedPeer(t *testing.T) { + vdr, vdrs, sender, vm, te, gBlk := setup(t) + + secondVdr := validators.GenerateRandomValidator(1) + vdrs.Add(secondVdr) + + sender.Default(true) + + missingBlk := &Blk{ + parent: gBlk, + id: GenerateID(), + height: 1, + status: choices.Unknown, + bytes: []byte{1}, + } + + pendingBlk := &Blk{ + parent: missingBlk, + id: GenerateID(), + height: 2, + status: choices.Processing, + bytes: []byte{2}, + } + + parsed := new(bool) + vm.ParseBlockF = func(b []byte) (snowman.Block, error) { + switch { + case bytes.Equal(b, pendingBlk.Bytes()): + *parsed = true + return pendingBlk, nil + } + return nil, errUnknownBlock + } + + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + if !*parsed { + return nil, errUnknownBlock + } + + switch { + case blkID.Equals(pendingBlk.ID()): + return pendingBlk, nil + } + return nil, errUnknownBlock + } + + reqID := new(uint32) + sender.GetF = func(reqVdr ids.ShortID, requestID uint32, blkID ids.ID) { + *reqID = requestID + if !reqVdr.Equals(vdr.ID()) { + t.Fatalf("Wrong validator requested") + } + if !blkID.Equals(missingBlk.ID()) { + t.Fatalf("Wrong block requested") + } + } + + te.PushQuery(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes()) + + te.Put(secondVdr.ID(), *reqID, missingBlk.ID(), []byte{3}) + + *parsed = false + vm.ParseBlockF = func(b []byte) (snowman.Block, error) { + switch { + case bytes.Equal(b, missingBlk.Bytes()): + *parsed = true + return missingBlk, nil + } + return nil, errUnknownBlock + } + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + if !*parsed { + return nil, errUnknownBlock + } + + switch { + case blkID.Equals(missingBlk.ID()): + return missingBlk, nil + } + return nil, errUnknownBlock + } + sender.CantPushQuery = false + sender.CantChits = false + + missingBlk.status = choices.Processing + + te.Put(vdr.ID(), *reqID, missingBlk.ID(), missingBlk.Bytes()) + + pref := te.Consensus.Preference() + if !pref.Equals(pendingBlk.ID()) { + t.Fatalf("Shouldn't have abandoned the pending block") + } +} + +func TestEnginePushQueryRequestIDConflict(t *testing.T) { + vdr, _, sender, vm, te, gBlk := setup(t) + + sender.Default(true) + + missingBlk := &Blk{ + parent: gBlk, + id: GenerateID(), + height: 1, + status: choices.Unknown, + bytes: []byte{1}, + } + + pendingBlk := &Blk{ + parent: missingBlk, + id: GenerateID(), + height: 2, + status: choices.Processing, + bytes: []byte{2}, + } + + randomBlkID := GenerateID() + + parsed := new(bool) + vm.ParseBlockF = func(b []byte) (snowman.Block, error) { + switch { + case bytes.Equal(b, pendingBlk.Bytes()): + *parsed = true + return pendingBlk, nil + } + return nil, errUnknownBlock + } + + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + if !*parsed { + return nil, errUnknownBlock + } + + switch { + case blkID.Equals(pendingBlk.ID()): + return pendingBlk, nil + } + return nil, errUnknownBlock + } + + reqID := new(uint32) + sender.GetF = func(reqVdr ids.ShortID, requestID uint32, blkID ids.ID) { + *reqID = requestID + if !reqVdr.Equals(vdr.ID()) { + t.Fatalf("Wrong validator requested") + } + if !blkID.Equals(missingBlk.ID()) { + t.Fatalf("Wrong block requested") + } + } + + te.PushQuery(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes()) + + sender.GetF = nil + sender.CantGet = false + + te.PushQuery(vdr.ID(), *reqID, randomBlkID, []byte{3}) + + *parsed = false + vm.ParseBlockF = func(b []byte) (snowman.Block, error) { + switch { + case bytes.Equal(b, missingBlk.Bytes()): + *parsed = true + return missingBlk, nil + } + return nil, errUnknownBlock + } + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + if !*parsed { + return nil, errUnknownBlock + } + + switch { + case blkID.Equals(missingBlk.ID()): + return missingBlk, nil + } + return nil, errUnknownBlock + } + sender.CantPushQuery = false + sender.CantChits = false + + te.Put(vdr.ID(), *reqID, missingBlk.ID(), missingBlk.Bytes()) + + pref := te.Consensus.Preference() + if !pref.Equals(pendingBlk.ID()) { + t.Fatalf("Shouldn't have abandoned the pending block") + } +} + +func TestEngineAggressivePolling(t *testing.T) { + config := DefaultConfig() + + config.Params.ConcurrentRepolls = 2 + + vdr := validators.GenerateRandomValidator(1) + + vals := validators.NewSet() + config.Validators = vals + + vals.Add(vdr) + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + sender.Default(true) + + vm := &VMTest{} + vm.T = t + config.VM = vm + + vm.Default(true) + vm.CantSetPreference = false + + gBlk := &Blk{ + id: GenerateID(), + status: choices.Accepted, + } + + vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } + sender.CantGetAcceptedFrontier = false + + te := &Transitive{} + + te.Initialize(config) + + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + if !blkID.Equals(gBlk.ID()) { + t.Fatalf("Wrong block requested") + } + return gBlk, nil + } + + te.finishBootstrapping() + + vm.GetBlockF = nil + vm.LastAcceptedF = nil + sender.CantGetAcceptedFrontier = true + + sender.Default(true) + + pendingBlk := &Blk{ + parent: gBlk, + id: GenerateID(), + height: 2, + status: choices.Processing, + bytes: []byte{1}, + } + + parsed := new(bool) + vm.ParseBlockF = func(b []byte) (snowman.Block, error) { + switch { + case bytes.Equal(b, pendingBlk.Bytes()): + *parsed = true + return pendingBlk, nil + } + return nil, errUnknownBlock + } + + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + if !*parsed { + return nil, errUnknownBlock + } + + switch { + case blkID.Equals(pendingBlk.ID()): + return pendingBlk, nil + } + return nil, errUnknownBlock + } + + numPushed := new(int) + sender.PushQueryF = func(_ ids.ShortSet, _ uint32, _ ids.ID, _ []byte) { *numPushed++ } + + numPulled := new(int) + sender.PullQueryF = func(_ ids.ShortSet, _ uint32, _ ids.ID) { *numPulled++ } + + te.Put(vdr.ID(), 0, pendingBlk.ID(), pendingBlk.Bytes()) + + if *numPushed != 1 { + t.Fatalf("Should have initially sent a push query") + } + + if *numPulled != 1 { + t.Fatalf("Should have sent an additional pull query") + } +} diff --git a/snow/engine/snowman/voter.go b/snow/engine/snowman/voter.go index 0c9779a..2c8b209 100644 --- a/snow/engine/snowman/voter.go +++ b/snow/engine/snowman/voter.go @@ -56,10 +56,7 @@ func (v *voter) Update() { } v.t.Config.Context.Log.Verbo("Snowman engine can't quiesce") - - if len(v.t.polls.m) < v.t.Config.Params.ConcurrentRepolls { - v.t.repoll() - } + v.t.repoll() } func (v *voter) bubbleVotes(votes ids.Bag) ids.Bag { diff --git a/snow/networking/handler/handler.go b/snow/networking/handler/handler.go index 12c65be..6f46909 100644 --- a/snow/networking/handler/handler.go +++ b/snow/networking/handler/handler.go @@ -78,7 +78,7 @@ func (h *Handler) dispatchMsg(msg message) bool { case getMsg: h.engine.Get(msg.validatorID, msg.requestID, msg.containerID) case getFailedMsg: - h.engine.GetFailed(msg.validatorID, msg.requestID, msg.containerID) + h.engine.GetFailed(msg.validatorID, msg.requestID) case putMsg: h.engine.Put(msg.validatorID, msg.requestID, msg.containerID, msg.container) case pushQueryMsg: @@ -185,12 +185,11 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids } // GetFailed passes a GetFailed message to the consensus engine. -func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32, containerID ids.ID) { +func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) { h.msgs <- message{ messageType: getFailedMsg, validatorID: validatorID, requestID: requestID, - containerID: containerID, } } diff --git a/snow/networking/router/router.go b/snow/networking/router/router.go index fca6983..d007f54 100644 --- a/snow/networking/router/router.go +++ b/snow/networking/router/router.go @@ -42,6 +42,6 @@ type ExternalRouter interface { type InternalRouter interface { GetAcceptedFrontierFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) - GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) + GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) } diff --git a/snow/networking/router/subnet_router.go b/snow/networking/router/subnet_router.go index 2d9e68a..ac95141 100644 --- a/snow/networking/router/subnet_router.go +++ b/snow/networking/router/subnet_router.go @@ -187,13 +187,13 @@ func (sr *ChainRouter) Put(validatorID ids.ShortID, chainID ids.ID, requestID ui // GetFailed routes an incoming GetFailed message from the validator with ID [validatorID] // to the consensus engine working on the chain with ID [chainID] -func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) { +func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, requestID uint32) { sr.lock.RLock() defer sr.lock.RUnlock() sr.timeouts.Cancel(validatorID, chainID, requestID) if chain, exists := sr.chains[chainID.Key()]; exists { - chain.GetFailed(validatorID, requestID, containerID) + chain.GetFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) } diff --git a/snow/networking/sender/sender.go b/snow/networking/sender/sender.go index ed500c3..4036e73 100644 --- a/snow/networking/sender/sender.go +++ b/snow/networking/sender/sender.go @@ -88,7 +88,7 @@ func (s *Sender) Get(validatorID ids.ShortID, requestID uint32, containerID ids. // Add a timeout -- if we don't get a response before the timeout expires, // send this consensus engine a GetFailed message s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { - s.router.GetFailed(validatorID, s.ctx.ChainID, requestID, containerID) + s.router.GetFailed(validatorID, s.ctx.ChainID, requestID) }) s.sender.Get(validatorID, s.ctx.ChainID, requestID, containerID) }