From 27eb3ca6eece3eb24104b9c397b7ade5a359e0d6 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Tue, 10 Mar 2020 16:10:53 -0400 Subject: [PATCH 01/15] fixed bugs added for the bug bounty --- bugs.txt | 9 ++ snow/consensus/avalanche/topological.go | 19 +++ snow/consensus/avalanche/topological_test.go | 72 +++++++++++ snow/consensus/snowball/unary_snowball.go | 2 + .../consensus/snowball/unary_snowball_test.go | 30 ++++- snow/engine/avalanche/bootstrapper.go | 6 + snow/engine/avalanche/bootstrapper_test.go | 108 ++++++++++++++++ snow/engine/avalanche/issuer.go | 7 +- snow/engine/avalanche/polls.go | 22 ++-- snow/engine/avalanche/transitive_test.go | 117 ++++++++++++++++++ snow/engine/snowman/bootstrapper.go | 6 + snow/engine/snowman/bootstrapper_test.go | 110 ++++++++++++++++ snow/engine/snowman/polls.go | 33 +++-- snow/engine/snowman/transitive.go | 14 ++- snow/engine/snowman/transitive_test.go | 112 +++++++++++++++++ 15 files changed, 630 insertions(+), 37 deletions(-) create mode 100644 bugs.txt diff --git a/bugs.txt b/bugs.txt new file mode 100644 index 0000000..984fcef --- /dev/null +++ b/bugs.txt @@ -0,0 +1,9 @@ +Added bugs: + +- Inside of gecko/snow/consensus/avalanche/topological.go#Topological.pushVotes the votes must be properly filtered. Specifically, a byzantine node should not be able to vote for two different transactions that conflict with each other during the same poll. If a node votes for conflicting transactions during the same poll, either all the node's votes should be dropped, or the votes for the conflicting transactions should be dropped. + +- Inside of gecko/snow/consensus/snowball/unary_snowball.go#unarySnowball.Extend the confidence and bias fields should have been set to the values in the unary snowball instance. + +- Inside of gecko/snow/engine/avalanche/bootstrapper.go#bootstrapper.Put and gecko/snow/engine/snowman/bootstrapper.go#bootstrapper.Put the engine must check that the provided vtx/blk ID is the ID of the parsed container. Otherwise, a byzantine node could send a container and report the wrong container ID for it. This would allow an un-intended container to be marked as accepted during bootstrapping. + +- Inside of gecko/snow/engine/avalanche/polls.go#poll and gecko/snow/engine/snowman/polls.go#poll the poll should only allow a validator to vote once per poll. Also, this validator must have been part of set of validators that was polled during the query. \ No newline at end of file diff --git a/snow/consensus/avalanche/topological.go b/snow/consensus/avalanche/topological.go index dca8a19..5e6212c 100644 --- a/snow/consensus/avalanche/topological.go +++ b/snow/consensus/avalanche/topological.go @@ -257,6 +257,7 @@ func (ta *Topological) pushVotes( kahnNodes map[[32]byte]kahnNode, leaves []ids.ID) ids.Bag { votes := make(ids.UniqueBag) + txConflicts := make(map[[32]byte]ids.Set) for len(leaves) > 0 { newLeavesSize := len(leaves) - 1 @@ -271,6 +272,12 @@ func (ta *Topological) pushVotes( // Give the votes to the consumer txID := tx.ID() votes.UnionSet(txID, kahn.votes) + + // Map txID to set of Conflicts + txKey := txID.Key() + if _, exists := txConflicts[txKey]; !exists { + txConflicts[txKey] = ta.cg.Conflicts(tx) + } } for _, dep := range vtx.Parents() { @@ -291,6 +298,18 @@ func (ta *Topological) pushVotes( } } + // Create bag of votes for conflicting transactions + conflictingVotes := make(ids.UniqueBag) + for txHash, conflicts := range txConflicts { + txID := ids.NewID(txHash) + for conflictTxHash := range conflicts { + conflictTxID := ids.NewID(conflictTxHash) + conflictingVotes.UnionSet(txID, votes.GetSet(conflictTxID)) + } + } + + votes.Difference(&conflictingVotes) + return votes.Bag(ta.params.Alpha) } diff --git a/snow/consensus/avalanche/topological_test.go b/snow/consensus/avalanche/topological_test.go index f43ee5b..c28d567 100644 --- a/snow/consensus/avalanche/topological_test.go +++ b/snow/consensus/avalanche/topological_test.go @@ -103,6 +103,78 @@ func TestAvalancheVoting(t *testing.T) { } } +func TestAvalancheIgnoreInvalidVoting(t *testing.T) { + params := Parameters{ + Parameters: snowball.Parameters{ + Metrics: prometheus.NewRegistry(), + K: 3, + Alpha: 2, + BetaVirtuous: 1, + BetaRogue: 1, + }, + Parents: 2, + BatchSize: 1, + } + + vts := []Vertex{&Vtx{ + id: GenerateID(), + status: choices.Accepted, + }, &Vtx{ + id: GenerateID(), + status: choices.Accepted, + }} + utxos := []ids.ID{GenerateID()} + + ta := Topological{} + ta.Initialize(snow.DefaultContextTest(), params, vts) + + tx0 := &snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + } + tx0.Ins.Add(utxos[0]) + + vtx0 := &Vtx{ + dependencies: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx0}, + height: 1, + status: choices.Processing, + } + + tx1 := &snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + } + tx1.Ins.Add(utxos[0]) + + vtx1 := &Vtx{ + dependencies: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx1}, + height: 1, + status: choices.Processing, + } + + ta.Add(vtx0) + ta.Add(vtx1) + + sm := make(ids.UniqueBag) + + sm.Add(0, vtx0.id) + sm.Add(1, vtx1.id) + + // Add Illegal Vote cast by Response 2 + sm.Add(2, vtx0.id) + sm.Add(2, vtx1.id) + + ta.RecordPoll(sm) + + if ta.Finalized() { + t.Fatalf("An avalanche instance finalized too early") + } +} + func TestAvalancheTransitiveVoting(t *testing.T) { params := Parameters{ Parameters: snowball.Parameters{ diff --git a/snow/consensus/snowball/unary_snowball.go b/snow/consensus/snowball/unary_snowball.go index 6d0db07..3999a74 100644 --- a/snow/consensus/snowball/unary_snowball.go +++ b/snow/consensus/snowball/unary_snowball.go @@ -48,9 +48,11 @@ func (sb *unarySnowball) Extend(beta int, choice int) BinarySnowball { snowflake: binarySnowflake{ beta: beta, preference: choice, + confidence: sb.confidence, finalized: sb.Finalized(), }, } + bs.numSuccessfulPolls[choice] = sb.numSuccessfulPolls return bs } diff --git a/snow/consensus/snowball/unary_snowball_test.go b/snow/consensus/snowball/unary_snowball_test.go index 8bf098a..224cd4c 100644 --- a/snow/consensus/snowball/unary_snowball_test.go +++ b/snow/consensus/snowball/unary_snowball_test.go @@ -42,11 +42,32 @@ func TestUnarySnowball(t *testing.T) { binarySnowball := sbClone.Extend(beta, 0) + expected := "SB(Preference = 0, NumSuccessfulPolls[0] = 2, NumSuccessfulPolls[1] = 0, SF = SF(Preference = 0, Confidence = 1, Finalized = false))" + if result := binarySnowball.String(); result != expected { + t.Fatalf("Expected:\n%s\nReturned:\n%s", expected, result) + } + binarySnowball.RecordUnsuccessfulPoll() + for i := 0; i < 3; i++ { + if binarySnowball.Preference() != 0 { + t.Fatalf("Wrong preference") + } else if binarySnowball.Finalized() { + t.Fatalf("Should not have finalized") + } + binarySnowball.RecordSuccessfulPoll(1) + binarySnowball.RecordUnsuccessfulPoll() + } + + if binarySnowball.Preference() != 1 { + t.Fatalf("Wrong preference") + } else if binarySnowball.Finalized() { + t.Fatalf("Should not have finalized") + } binarySnowball.RecordSuccessfulPoll(1) - - if binarySnowball.Finalized() { + if binarySnowball.Preference() != 1 { + t.Fatalf("Wrong preference") + } else if binarySnowball.Finalized() { t.Fatalf("Should not have finalized") } @@ -57,4 +78,9 @@ func TestUnarySnowball(t *testing.T) { } else if !binarySnowball.Finalized() { t.Fatalf("Should have finalized") } + + expected = "SB(NumSuccessfulPolls = 2, Confidence = 1, Finalized = false)" + if str := sb.String(); str != expected { + t.Fatalf("Wrong state. Expected:\n%s\nGot:\n%s", expected, str) + } } diff --git a/snow/engine/avalanche/bootstrapper.go b/snow/engine/avalanche/bootstrapper.go index 7d3d7c8..26b202e 100644 --- a/snow/engine/avalanche/bootstrapper.go +++ b/snow/engine/avalanche/bootstrapper.go @@ -103,6 +103,12 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxB return } + if realVtxID := vtx.ID(); !vtxID.Equals(realVtxID) { + b.BootstrapConfig.Context.Log.Warn("Put called for vertexID %s, but provided vertexID %s", vtxID, realVtxID) + b.GetFailed(vdr, requestID, vtxID) + return + } + b.addVertex(vtx) } diff --git a/snow/engine/avalanche/bootstrapper_test.go b/snow/engine/avalanche/bootstrapper_test.go index d1be936..6f581b5 100644 --- a/snow/engine/avalanche/bootstrapper_test.go +++ b/snow/engine/avalanche/bootstrapper_test.go @@ -334,6 +334,114 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) { } } +func TestBootstrapperWrongIDByzantineResponse(t *testing.T) { + config, peerID, sender, state, _ := newConfig(t) + + vtxID0 := ids.Empty.Prefix(0) + vtxID1 := ids.Empty.Prefix(1) + + vtxBytes0 := []byte{0} + vtxBytes1 := []byte{1} + + vtx0 := &Vtx{ + id: vtxID0, + height: 0, + status: choices.Processing, + bytes: vtxBytes0, + } + vtx1 := &Vtx{ + id: vtxID1, + height: 0, + status: choices.Processing, + bytes: vtxBytes1, + } + + bs := bootstrapper{} + bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry()) + bs.Initialize(config) + + acceptedIDs := ids.Set{} + acceptedIDs.Add( + vtxID0, + ) + + state.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + switch { + case vtxID.Equals(vtxID0): + return nil, errUnknownVertex + default: + t.Fatal(errUnknownVertex) + panic(errUnknownVertex) + } + } + + requestID := new(uint32) + sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) { + if !vdr.Equals(peerID) { + t.Fatalf("Should have requested vertex from %s, requested from %s", peerID, vdr) + } + switch { + case vtxID.Equals(vtxID0): + default: + t.Fatalf("Requested unknown vertex") + } + + *requestID = reqID + } + + bs.ForceAccepted(acceptedIDs) + + state.getVertex = nil + sender.GetF = nil + + state.parseVertex = func(vtxBytes []byte) (avalanche.Vertex, error) { + switch { + case bytes.Equal(vtxBytes, vtxBytes0): + return vtx0, nil + case bytes.Equal(vtxBytes, vtxBytes1): + return vtx1, nil + } + t.Fatal(errParsedUnknownVertex) + return nil, errParsedUnknownVertex + } + + state.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) { + switch { + case vtxID.Equals(vtxID0): + return vtx0, nil + case vtxID.Equals(vtxID1): + return vtx1, nil + default: + t.Fatal(errUnknownVertex) + panic(errUnknownVertex) + } + } + + finished := new(bool) + bs.onFinished = func() { *finished = true } + sender.CantGet = false + + bs.Put(peerID, *requestID, vtxID0, vtxBytes1) + + sender.CantGet = true + + bs.Put(peerID, *requestID, vtxID0, vtxBytes0) + + state.parseVertex = nil + state.edge = nil + bs.onFinished = nil + + if !*finished { + t.Fatalf("Bootstrapping should have finished") + } + if vtx0.Status() != choices.Accepted { + t.Fatalf("Vertex should be accepted") + } + if vtx1.Status() != choices.Processing { + t.Fatalf("Vertex should be processing") + } +} + func TestBootstrapperVertexDependencies(t *testing.T) { config, peerID, sender, state, _ := newConfig(t) diff --git a/snow/engine/avalanche/issuer.go b/snow/engine/avalanche/issuer.go index befe973..f953fe6 100644 --- a/snow/engine/avalanche/issuer.go +++ b/snow/engine/avalanche/issuer.go @@ -64,9 +64,12 @@ func (i *issuer) Update() { vdrSet.Add(vdr.ID()) } + toSample := ids.ShortSet{} // Copy to a new variable because we may remove an element in sender.Sender + toSample.Union(vdrSet) // and we don't want that to affect the set of validators we wait for [ie vdrSet] + i.t.RequestID++ - if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet.Len()) { - i.t.Config.Sender.PushQuery(vdrSet, i.t.RequestID, vtxID, i.vtx.Bytes()) + if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet) { + i.t.Config.Sender.PushQuery(toSample, i.t.RequestID, vtxID, i.vtx.Bytes()) } else if numVdrs < p.K { i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID) } diff --git a/snow/engine/avalanche/polls.go b/snow/engine/avalanche/polls.go index 282fe6a..fa1e7df 100644 --- a/snow/engine/avalanche/polls.go +++ b/snow/engine/avalanche/polls.go @@ -38,10 +38,10 @@ type polls struct { // Add to the current set of polls // Returns true if the poll was registered correctly and the network sample // should be made. -func (p *polls) Add(requestID uint32, numPolled int) bool { +func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool { poll, exists := p.m[requestID] if !exists { - poll.numPending = numPolled + poll.polled = vdrs p.m[requestID] = poll p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -59,7 +59,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.Uni return nil, false } - poll.Vote(votes) + poll.Vote(votes, vdr) if poll.Finished() { p.log.Verbo("Poll is finished") delete(p.m, requestID) @@ -83,19 +83,19 @@ func (p *polls) String() string { // poll represents the current state of a network poll for a vertex type poll struct { - votes ids.UniqueBag - numPending int + votes ids.UniqueBag + polled ids.ShortSet } // Vote registers a vote for this poll -func (p *poll) Vote(votes []ids.ID) { - if p.numPending > 0 { - p.numPending-- - p.votes.Add(uint(p.numPending), votes...) +func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) { + if p.polled.Contains(vdr) { + p.polled.Remove(vdr) + p.votes.Add(uint(p.polled.Len()), votes...) } } // Finished returns true if the poll has completed, with no more required // responses -func (p poll) Finished() bool { return p.numPending <= 0 } -func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.numPending) } +func (p poll) Finished() bool { return p.polled.Len() == 0 } +func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.polled.Len()) } diff --git a/snow/engine/avalanche/transitive_test.go b/snow/engine/avalanche/transitive_test.go index 6f5b5ed..15b258b 100644 --- a/snow/engine/avalanche/transitive_test.go +++ b/snow/engine/avalanche/transitive_test.go @@ -2363,3 +2363,120 @@ func TestEngineBootstrappingIntoConsensus(t *testing.T) { sender.PushQueryF = nil st.getVertex = nil } + +func TestEngineDoubleChit(t *testing.T) { + config := DefaultConfig() + + config.Params.Alpha = 2 + config.Params.K = 2 + + vdr0 := validators.GenerateRandomValidator(1) + vdr1 := validators.GenerateRandomValidator(1) + vals := validators.NewSet() + vals.Add(vdr0) + vals.Add(vdr1) + config.Validators = vals + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + sender.Default(true) + sender.CantGetAcceptedFrontier = false + + st := &stateTest{t: t} + config.State = st + + st.Default(true) + + gVtx := &Vtx{ + id: GenerateID(), + status: choices.Accepted, + } + mVtx := &Vtx{ + id: GenerateID(), + status: choices.Accepted, + } + + vts := []avalanche.Vertex{gVtx, mVtx} + utxos := []ids.ID{GenerateID()} + + tx := &TestTx{ + TestTx: snowstorm.TestTx{ + Identifier: GenerateID(), + Stat: choices.Processing, + }, + } + tx.Ins.Add(utxos[0]) + + vtx := &Vtx{ + parents: vts, + id: GenerateID(), + txs: []snowstorm.Tx{tx}, + height: 1, + status: choices.Processing, + bytes: []byte{1, 1, 2, 3}, + } + + st.edge = func() []ids.ID { return []ids.ID{vts[0].ID(), vts[1].ID()} } + st.getVertex = func(id ids.ID) (avalanche.Vertex, error) { + switch { + case id.Equals(gVtx.ID()): + return gVtx, nil + case id.Equals(mVtx.ID()): + return mVtx, nil + } + t.Fatalf("Unknown vertex") + panic("Should have errored") + } + + te := &Transitive{} + te.Initialize(config) + te.finishBootstrapping() + + reqID := new(uint32) + sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, vtxID ids.ID, _ []byte) { + *reqID = requestID + if inVdrs.Len() != 2 { + t.Fatalf("Wrong number of validators") + } + if !vtxID.Equals(vtx.ID()) { + t.Fatalf("Wrong vertex requested") + } + } + st.getVertex = func(id ids.ID) (avalanche.Vertex, error) { + switch { + case id.Equals(vtx.ID()): + return vtx, nil + } + t.Fatalf("Unknown vertex") + panic("Should have errored") + } + + te.insert(vtx) + + votes := ids.Set{} + votes.Add(vtx.ID()) + + if status := tx.Status(); status != choices.Processing { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *reqID, votes) + + if status := tx.Status(); status != choices.Processing { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *reqID, votes) + + if status := tx.Status(); status != choices.Processing { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr1.ID(), *reqID, votes) + + if status := tx.Status(); status != choices.Accepted { + t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Accepted) + } +} diff --git a/snow/engine/snowman/bootstrapper.go b/snow/engine/snowman/bootstrapper.go index 88724ed..2c0415e 100644 --- a/snow/engine/snowman/bootstrapper.go +++ b/snow/engine/snowman/bootstrapper.go @@ -97,6 +97,12 @@ func (b *bootstrapper) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkB return } + if realBlkID := blk.ID(); !blkID.Equals(realBlkID) { + b.BootstrapConfig.Context.Log.Warn("Put called for blockID %s, but provided blockID %s", blkID, realBlkID) + b.GetFailed(vdr, requestID, blkID) + return + } + b.addBlock(blk) } diff --git a/snow/engine/snowman/bootstrapper_test.go b/snow/engine/snowman/bootstrapper_test.go index 9cb0968..a972a23 100644 --- a/snow/engine/snowman/bootstrapper_test.go +++ b/snow/engine/snowman/bootstrapper_test.go @@ -252,6 +252,116 @@ func TestBootstrapperUnknownByzantineResponse(t *testing.T) { } } +func TestBootstrapperWrongIDByzantineResponse(t *testing.T) { + config, peerID, sender, vm := newConfig(t) + + blkID0 := ids.Empty.Prefix(0) + blkID1 := ids.Empty.Prefix(1) + blkID2 := ids.Empty.Prefix(2) + + blkBytes0 := []byte{0} + blkBytes1 := []byte{1} + blkBytes2 := []byte{2} + + blk0 := &Blk{ + id: blkID0, + height: 0, + status: choices.Accepted, + bytes: blkBytes0, + } + blk1 := &Blk{ + parent: blk0, + id: blkID1, + height: 1, + status: choices.Processing, + bytes: blkBytes1, + } + blk2 := &Blk{ + parent: blk1, + id: blkID2, + height: 2, + status: choices.Processing, + bytes: blkBytes2, + } + + bs := bootstrapper{} + bs.metrics.Initialize(config.Context.Log, fmt.Sprintf("gecko_%s", config.Context.ChainID), prometheus.NewRegistry()) + bs.Initialize(config) + + acceptedIDs := ids.Set{} + acceptedIDs.Add(blkID1) + + vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) { + switch { + case blkID.Equals(blkID1): + return nil, errUnknownBlock + default: + t.Fatal(errUnknownBlock) + panic(errUnknownBlock) + } + } + + requestID := new(uint32) + sender.GetF = func(vdr ids.ShortID, reqID uint32, vtxID ids.ID) { + if !vdr.Equals(peerID) { + t.Fatalf("Should have requested block from %s, requested from %s", peerID, vdr) + } + switch { + case vtxID.Equals(blkID1): + default: + t.Fatalf("Requested unknown block") + } + + *requestID = reqID + } + + bs.ForceAccepted(acceptedIDs) + + vm.GetBlockF = nil + sender.GetF = nil + + vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) { + switch { + case bytes.Equal(blkBytes, blkBytes2): + return blk2, nil + } + t.Fatal(errUnknownBlock) + return nil, errUnknownBlock + } + + sender.CantGet = false + + bs.Put(peerID, *requestID, blkID1, blkBytes2) + + sender.CantGet = true + + vm.ParseBlockF = func(blkBytes []byte) (snowman.Block, error) { + switch { + case bytes.Equal(blkBytes, blkBytes1): + return blk1, nil + } + t.Fatal(errUnknownBlock) + return nil, errUnknownBlock + } + + finished := new(bool) + bs.onFinished = func() { *finished = true } + + bs.Put(peerID, *requestID, blkID1, blkBytes1) + + vm.ParseBlockF = nil + + if !*finished { + t.Fatalf("Bootstrapping should have finished") + } + if blk1.Status() != choices.Accepted { + t.Fatalf("Block should be accepted") + } + if blk2.Status() != choices.Processing { + t.Fatalf("Block should be processing") + } +} + func TestBootstrapperDependency(t *testing.T) { config, peerID, sender, vm := newConfig(t) diff --git a/snow/engine/snowman/polls.go b/snow/engine/snowman/polls.go index 6e666dc..6765ff7 100644 --- a/snow/engine/snowman/polls.go +++ b/snow/engine/snowman/polls.go @@ -22,11 +22,11 @@ type polls struct { // Add to the current set of polls // Returns true if the poll was registered correctly and the network sample // should be made. -func (p *polls) Add(requestID uint32, numPolled int) bool { +func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool { poll, exists := p.m[requestID] if !exists { poll.alpha = p.alpha - poll.numPolled = numPolled + poll.polled = vdrs p.m[requestID] = poll p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -42,7 +42,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, b if !exists { return ids.Bag{}, false } - poll.Vote(vote) + poll.Vote(vote, vdr) if poll.Finished() { delete(p.m, requestID) p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -60,7 +60,7 @@ func (p *polls) CancelVote(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) { return ids.Bag{}, false } - poll.CancelVote() + poll.CancelVote(vdr) if poll.Finished() { delete(p.m, requestID) p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -83,22 +83,18 @@ func (p *polls) String() string { // poll represents the current state of a network poll for a block type poll struct { - alpha int - votes ids.Bag - numPolled int + alpha int + votes ids.Bag + polled ids.ShortSet } // Vote registers a vote for this poll -func (p *poll) CancelVote() { - if p.numPolled > 0 { - p.numPolled-- - } -} +func (p *poll) CancelVote(vdr ids.ShortID) { p.polled.Remove(vdr) } // Vote registers a vote for this poll -func (p *poll) Vote(vote ids.ID) { - if p.numPolled > 0 { - p.numPolled-- +func (p *poll) Vote(vote ids.ID, vdr ids.ShortID) { + if p.polled.Contains(vdr) { + p.polled.Remove(vdr) p.votes.Add(vote) } } @@ -106,13 +102,14 @@ func (p *poll) Vote(vote ids.ID) { // Finished returns true if the poll has completed, with no more required // responses func (p poll) Finished() bool { + remaining := p.polled.Len() received := p.votes.Len() _, freq := p.votes.Mode() - return p.numPolled == 0 || // All k nodes responded + return remaining == 0 || // All k nodes responded freq >= p.alpha || // An alpha majority has returned - received+p.numPolled < p.alpha // An alpha majority can never return + received+remaining < p.alpha // An alpha majority can never return } func (p poll) String() string { - return fmt.Sprintf("Waiting on %d chits", p.numPolled) + return fmt.Sprintf("Waiting on %d chits from %s", p.polled.Len(), p.polled) } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index e023a7d..9e97f0a 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -297,9 +297,12 @@ func (t *Transitive) pullSample(blkID ids.ID) { vdrSet.Add(vdr.ID()) } + toSample := ids.ShortSet{} + toSample.Union(vdrSet) + t.RequestID++ - if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) { - t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID) + if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) { + t.Config.Sender.PullQuery(toSample, t.RequestID, blkID) } else if numVdrs < p.K { t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blkID) } @@ -314,9 +317,12 @@ func (t *Transitive) pushSample(blk snowman.Block) { vdrSet.Add(vdr.ID()) } + toSample := ids.ShortSet{} + toSample.Union(vdrSet) + t.RequestID++ - if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) { - t.Config.Sender.PushQuery(vdrSet, t.RequestID, blk.ID(), blk.Bytes()) + if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) { + t.Config.Sender.PushQuery(toSample, t.RequestID, blk.ID(), blk.Bytes()) } else if numVdrs < p.K { t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", blk.ID()) } diff --git a/snow/engine/snowman/transitive_test.go b/snow/engine/snowman/transitive_test.go index 1920d8c..c97f2f0 100644 --- a/snow/engine/snowman/transitive_test.go +++ b/snow/engine/snowman/transitive_test.go @@ -1076,3 +1076,115 @@ func TestEngineRetryFetch(t *testing.T) { t.Fatalf("Should have requested the block again") } } + +func TestEngineDoubleChit(t *testing.T) { + config := DefaultConfig() + + config.Params = snowball.Parameters{ + Metrics: prometheus.NewRegistry(), + K: 2, + Alpha: 2, + BetaVirtuous: 1, + BetaRogue: 2, + } + + vdr0 := validators.GenerateRandomValidator(1) + vdr1 := validators.GenerateRandomValidator(1) + + vals := validators.NewSet() + config.Validators = vals + + vals.Add(vdr0) + vals.Add(vdr1) + + sender := &common.SenderTest{} + sender.T = t + config.Sender = sender + + sender.Default(true) + + vm := &VMTest{} + vm.T = t + config.VM = vm + + vm.Default(true) + vm.CantSetPreference = false + + gBlk := &Blk{ + id: GenerateID(), + status: choices.Accepted, + } + + vm.LastAcceptedF = func() ids.ID { return gBlk.ID() } + sender.CantGetAcceptedFrontier = false + + te := &Transitive{} + te.Initialize(config) + te.finishBootstrapping() + + vm.LastAcceptedF = nil + sender.CantGetAcceptedFrontier = true + + blk := &Blk{ + parent: gBlk, + id: GenerateID(), + status: choices.Processing, + bytes: []byte{1}, + } + + queried := new(bool) + queryRequestID := new(uint32) + sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, blkID ids.ID, blkBytes []byte) { + if *queried { + t.Fatalf("Asked multiple times") + } + *queried = true + *queryRequestID = requestID + vdrSet := ids.ShortSet{} + vdrSet.Add(vdr0.ID(), vdr1.ID()) + if !inVdrs.Equals(vdrSet) { + t.Fatalf("Asking wrong validator for preference") + } + if !blk.ID().Equals(blkID) { + t.Fatalf("Asking for wrong block") + } + } + + te.insert(blk) + + vm.GetBlockF = func(id ids.ID) (snowman.Block, error) { + switch { + case id.Equals(gBlk.ID()): + return gBlk, nil + case id.Equals(blk.ID()): + return blk, nil + } + t.Fatalf("Unknown block") + panic("Should have errored") + } + + blkSet := ids.Set{} + blkSet.Add(blk.ID()) + + if status := blk.Status(); status != choices.Processing { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *queryRequestID, blkSet) + + if status := blk.Status(); status != choices.Processing { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr0.ID(), *queryRequestID, blkSet) + + if status := blk.Status(); status != choices.Processing { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing) + } + + te.Chits(vdr1.ID(), *queryRequestID, blkSet) + + if status := blk.Status(); status != choices.Accepted { + t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Accepted) + } +} From 8c42f14a49f566983d059ddc8ce832eb027194d4 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Thu, 4 Jun 2020 01:57:43 -0400 Subject: [PATCH 02/15] Added ping pong messages --- network/builder.go | 6 ++++ network/commands.go | 9 +++++ network/metrics.go | 7 ++++ network/network.go | 10 ++++++ network/peer.go | 86 ++++++++++++++++++++++++++++++++++++++------- 5 files changed, 105 insertions(+), 13 deletions(-) diff --git a/network/builder.go b/network/builder.go index 20c7e4a..c7ba4aa 100644 --- a/network/builder.go +++ b/network/builder.go @@ -33,6 +33,12 @@ func (m Builder) PeerList(ipDescs []utils.IPDesc) (Msg, error) { return m.Pack(PeerList, map[Field]interface{}{Peers: ipDescs}) } +// Ping message +func (m Builder) Ping() (Msg, error) { return m.Pack(Ping, nil) } + +// Pong message +func (m Builder) Pong() (Msg, error) { return m.Pack(Pong, nil) } + // GetAcceptedFrontier message func (m Builder) GetAcceptedFrontier(chainID ids.ID, requestID uint32) (Msg, error) { return m.Pack(GetAcceptedFrontier, map[Field]interface{}{ diff --git a/network/commands.go b/network/commands.go index 177f58b..ab58a8d 100644 --- a/network/commands.go +++ b/network/commands.go @@ -125,6 +125,10 @@ func (op Op) String() string { return "get_peerlist" case PeerList: return "peerlist" + case Ping: + return "ping" + case Pong: + return "pong" case GetAcceptedFrontier: return "get_accepted_frontier" case AcceptedFrontier: @@ -166,6 +170,9 @@ const ( PushQuery PullQuery Chits + // Handshake: + Ping + Pong ) // Defines the messages that can be sent/received with this network @@ -176,6 +183,8 @@ var ( Version: []Field{NetworkID, NodeID, MyTime, IP, VersionStr}, GetPeerList: []Field{}, PeerList: []Field{Peers}, + Ping: []Field{}, + Pong: []Field{}, // Bootstrapping: GetAcceptedFrontier: []Field{ChainID, RequestID}, AcceptedFrontier: []Field{ChainID, RequestID, ContainerIDs}, diff --git a/network/metrics.go b/network/metrics.go index 3afda5b..400e27a 100644 --- a/network/metrics.go +++ b/network/metrics.go @@ -54,6 +54,7 @@ type metrics struct { getVersion, version, getPeerlist, peerlist, + ping, pong, getAcceptedFrontier, acceptedFrontier, getAccepted, accepted, get, put, @@ -78,6 +79,8 @@ func (m *metrics) initialize(registerer prometheus.Registerer) error { errs.Add(m.version.initialize(Version, registerer)) errs.Add(m.getPeerlist.initialize(GetPeerList, registerer)) errs.Add(m.peerlist.initialize(PeerList, registerer)) + errs.Add(m.ping.initialize(Ping, registerer)) + errs.Add(m.pong.initialize(Pong, registerer)) errs.Add(m.getAcceptedFrontier.initialize(GetAcceptedFrontier, registerer)) errs.Add(m.acceptedFrontier.initialize(AcceptedFrontier, registerer)) errs.Add(m.getAccepted.initialize(GetAccepted, registerer)) @@ -101,6 +104,10 @@ func (m *metrics) message(msgType Op) *messageMetrics { return &m.getPeerlist case PeerList: return &m.peerlist + case Ping: + return &m.ping + case Pong: + return &m.pong case GetAcceptedFrontier: return &m.getAcceptedFrontier case AcceptedFrontier: diff --git a/network/network.go b/network/network.go index 50471eb..85ce09d 100644 --- a/network/network.go +++ b/network/network.go @@ -39,6 +39,8 @@ const ( defaultGetVersionTimeout = 2 * time.Second defaultAllowPrivateIPs = true defaultGossipSize = 50 + defaultPingPongTimeout = time.Minute + defaultPingFrequency = 3 * defaultPingPongTimeout / 4 ) // Network defines the functionality of the networking library. @@ -113,6 +115,8 @@ type network struct { getVersionTimeout time.Duration allowPrivateIPs bool gossipSize int + pingPongTimeout time.Duration + pingFrequency time.Duration executor timer.Executor @@ -171,6 +175,8 @@ func NewDefaultNetwork( defaultGetVersionTimeout, defaultAllowPrivateIPs, defaultGossipSize, + defaultPingPongTimeout, + defaultPingFrequency, ) } @@ -200,6 +206,8 @@ func NewNetwork( getVersionTimeout time.Duration, allowPrivateIPs bool, gossipSize int, + pingPongTimeout time.Duration, + pingFrequency time.Duration, ) Network { net := &network{ log: log, @@ -226,6 +234,8 @@ func NewNetwork( getVersionTimeout: getVersionTimeout, allowPrivateIPs: allowPrivateIPs, gossipSize: gossipSize, + pingPongTimeout: pingPongTimeout, + pingFrequency: pingFrequency, disconnectedIPs: make(map[string]struct{}), connectedIPs: make(map[string]struct{}), diff --git a/network/peer.go b/network/peer.go index 5bb7601..394b838 100644 --- a/network/peer.go +++ b/network/peer.go @@ -60,6 +60,24 @@ func (p *peer) Start() { // Initially send the version to the peer go p.Version() go p.requestVersion() + go p.sendPings() +} + +func (p *peer) sendPings() { + t := time.NewTicker(p.net.pingFrequency) + defer t.Stop() + + for range t.C { + p.net.stateLock.Lock() + closed := p.closed + p.net.stateLock.Unlock() + + if closed { + return + } + + p.Ping() + } } // request the version from the peer until we get the version from them @@ -76,6 +94,7 @@ func (p *peer) requestVersion() { if connected || closed { return } + p.GetVersion() } } @@ -84,6 +103,11 @@ func (p *peer) requestVersion() { func (p *peer) ReadMessages() { defer p.Close() + if err := p.conn.SetReadDeadline(p.net.clock.Time().Add(p.net.pingPongTimeout)); err != nil { + p.net.log.Verbo("error on setting the connection read timeout %s", err) + return + } + pendingBuffer := wrappers.Packer{} readBuffer := make([]byte, 1<<10) for { @@ -196,7 +220,15 @@ func (p *peer) send(msg Msg) bool { // assumes the stateLock is not held func (p *peer) handle(msg Msg) { p.net.heartbeat() - atomic.StoreInt64(&p.lastReceived, p.net.clock.Time().Unix()) + + currentTime := p.net.clock.Time() + atomic.StoreInt64(&p.lastReceived, currentTime.Unix()) + + if err := p.conn.SetReadDeadline(currentTime.Add(p.net.pingPongTimeout)); err != nil { + p.net.log.Verbo("error on setting the connection read timeout %s, closing the connection", err) + p.Close() + return + } op := msg.Op() msgMetrics := p.net.message(op) @@ -213,6 +245,12 @@ func (p *peer) handle(msg Msg) { case GetVersion: p.getVersion(msg) return + case Ping: + p.ping(msg) + return + case Pong: + p.pong(msg) + return } if !p.connected { p.net.log.Debug("dropping message from %s because the connection hasn't been established yet", p.id) @@ -290,6 +328,12 @@ func (p *peer) GetPeerList() { p.Send(msg) } +// assumes the stateLock is not held +func (p *peer) SendPeerList() { + ips := p.net.validatorIPs() + p.PeerList(ips) +} + // assumes the stateLock is not held func (p *peer) PeerList(peers []utils.IPDesc) { msg, err := p.net.b.PeerList(peers) @@ -298,7 +342,28 @@ func (p *peer) PeerList(peers []utils.IPDesc) { return } p.Send(msg) - return +} + +// assumes the stateLock is not held +func (p *peer) Ping() { + msg, err := p.net.b.Ping() + p.net.log.AssertNoError(err) + if p.Send(msg) { + p.net.ping.numSent.Inc() + } else { + p.net.ping.numFailed.Inc() + } +} + +// assumes the stateLock is not held +func (p *peer) Pong() { + msg, err := p.net.b.Pong() + p.net.log.AssertNoError(err) + if p.Send(msg) { + p.net.pong.numSent.Inc() + } else { + p.net.pong.numFailed.Inc() + } } // assumes the stateLock is not held @@ -430,17 +495,6 @@ func (p *peer) version(msg Msg) { p.net.connected(p) } -// assumes the stateLock is not held -func (p *peer) SendPeerList() { - ips := p.net.validatorIPs() - reply, err := p.net.b.PeerList(ips) - if err != nil { - p.net.log.Warn("failed to send PeerList message due to %s", err) - return - } - p.Send(reply) -} - // assumes the stateLock is not held func (p *peer) getPeerList(_ Msg) { p.SendPeerList() } @@ -460,6 +514,12 @@ func (p *peer) peerList(msg Msg) { p.net.stateLock.Unlock() } +// assumes the stateLock is not held +func (p *peer) ping(_ Msg) { p.Pong() } + +// assumes the stateLock is not held +func (p *peer) pong(_ Msg) {} + // assumes the stateLock is not held func (p *peer) getAcceptedFrontier(msg Msg) { chainID, err := ids.ToID(msg.Get(ChainID).([]byte)) From 3a4ffb48505cce59b9a0e5465bca1526c5fd7766 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Mon, 8 Jun 2020 20:30:03 -0400 Subject: [PATCH 03/15] lower log level for gossiped put messages --- network/network.go | 6 +++++- snow/engine/avalanche/transitive.go | 6 +++++- snow/engine/snowman/transitive.go | 6 +++++- 3 files changed, 15 insertions(+), 3 deletions(-) diff --git a/network/network.go b/network/network.go index 9780c07..7bfc4f7 100644 --- a/network/network.go +++ b/network/network.go @@ -39,6 +39,10 @@ const ( defaultGetVersionTimeout = 2 * time.Second defaultAllowPrivateIPs = true defaultGossipSize = 50 + + // Request ID used when sending a Put message to gossip an accepted container + // (ie not sent in response to a Get) + GossipMsgRequestID = math.MaxUint32 ) // Network defines the functionality of the networking library. @@ -620,7 +624,7 @@ func (n *network) Track(ip utils.IPDesc) { // assumes the stateLock is not held. func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte) error { - msg, err := n.b.Put(chainID, math.MaxUint32, containerID, container) + msg, err := n.b.Put(chainID, GossipMsgRequestID, containerID, container) if err != nil { return fmt.Errorf("attempted to pack too large of a Put message.\nContainer length: %d", len(container)) } diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index e48b167..37f0c9a 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -169,7 +169,11 @@ func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, vtxID ids.ID, vtxByt t.Config.Context.Log.Verbo("Put(%s, %d, %s) called", vdr, requestID, vtxID) if !t.bootstrapped { // Bootstrapping unfinished --> didn't call Get --> this message is invalid - t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID) + if requestID == network.GossipMsgRequestID { + t.Config.Context.Log.Verbo("dropping gossip Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID) + } else { + t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, vtxID) + } return nil } diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index f155f5e..5461c44 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -185,7 +185,11 @@ func (t *Transitive) GetAncestors(vdr ids.ShortID, requestID uint32, blkID ids.I func (t *Transitive) Put(vdr ids.ShortID, requestID uint32, blkID ids.ID, blkBytes []byte) error { // bootstrapping isn't done --> we didn't send any gets --> this put is invalid if !t.bootstrapped { - t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID) + if requestID == network.GossipMsgRequestID { + t.Config.Context.Log.Verbo("dropping gossip Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID) + } else { + t.Config.Context.Log.Debug("dropping Put(%s, %d, %s) due to bootstrapping", vdr, requestID, blkID) + } return nil } From f52d0c29bd35574714f5e5a132a4c77215c208fe Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Thu, 11 Jun 2020 18:00:21 -0400 Subject: [PATCH 04/15] Register a timeout for querying ourselves to ensure we never drop a query --- go.mod | 1 + go.sum | 4 + snow/engine/snowman/transitive.go | 1 - snow/networking/router/handler.go | 166 ++++++++++++++---------- snow/networking/router/subnet_router.go | 51 ++------ snow/networking/sender/sender.go | 73 ++++++----- snow/networking/sender/sender_test.go | 126 ++++++++++++++++++ 7 files changed, 280 insertions(+), 142 deletions(-) diff --git a/go.mod b/go.mod index 4636c8c..a8a8f39 100644 --- a/go.mod +++ b/go.mod @@ -33,6 +33,7 @@ require ( github.com/olekukonko/tablewriter v0.0.4 // indirect github.com/pborman/uuid v1.2.0 // indirect github.com/prometheus/client_golang v1.6.0 + github.com/prometheus/common v0.9.1 github.com/prometheus/tsdb v0.10.0 // indirect github.com/rjeczalik/notify v0.9.2 // indirect github.com/rs/cors v1.7.0 diff --git a/go.sum b/go.sum index 774be35..4bb3270 100644 --- a/go.sum +++ b/go.sum @@ -7,8 +7,10 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU= github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= +github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= +github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E= github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc= github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= @@ -217,6 +219,7 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU= github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 h1:Oo2KZNP70KE0+IUJSidPj/BFS/RXNHmKIJOdckzml2E= @@ -336,6 +339,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo= google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM= google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU= +gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/snow/engine/snowman/transitive.go b/snow/engine/snowman/transitive.go index f155f5e..2c6267c 100644 --- a/snow/engine/snowman/transitive.go +++ b/snow/engine/snowman/transitive.go @@ -579,7 +579,6 @@ func (t *Transitive) pushSample(blk snowman.Block) { } t.Config.Sender.PushQuery(vdrSet, t.RequestID, blkID, blk.Bytes()) - return } func (t *Transitive) deliver(blk snowman.Block) error { diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 8b40223..3f1e21f 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -4,6 +4,7 @@ package router import ( + "sync" "time" "github.com/ava-labs/gecko/ids" @@ -17,12 +18,18 @@ import ( type Handler struct { metrics - msgs chan message - closed chan struct{} - engine common.Engine - msgChan <-chan common.Message + msgs chan message + reliableMsgsSema chan struct{} + reliableMsgsLock sync.Mutex + reliableMsgs []message + closed chan struct{} + msgChan <-chan common.Message + + ctx *snow.Context + engine common.Engine toClose func() + closing bool } // Initialize this consensus handler @@ -35,9 +42,12 @@ func (h *Handler) Initialize( ) { h.metrics.Initialize(namespace, metrics) h.msgs = make(chan message, bufferSize) + h.reliableMsgsSema = make(chan struct{}, 1) h.closed = make(chan struct{}) - h.engine = engine h.msgChan = msgChan + + h.ctx = engine.Context() + h.engine = engine } // Context of this Handler @@ -46,37 +56,38 @@ func (h *Handler) Context() *snow.Context { return h.engine.Context() } // Dispatch waits for incoming messages from the network // and, when they arrive, sends them to the consensus engine func (h *Handler) Dispatch() { - log := h.Context().Log defer func() { - log.Info("finished shutting down chain") + h.ctx.Log.Info("finished shutting down chain") close(h.closed) }() - closing := false for { select { case msg, ok := <-h.msgs: if !ok { + // the msgs channel has been closed, so this dispatcher should exit return } + h.metrics.pending.Dec() - if closing { - log.Debug("dropping message due to closing:\n%s", msg) - continue - } - if h.dispatchMsg(msg) { - closing = true + h.dispatchMsg(msg) + case <-h.reliableMsgsSema: + // get all the reliable messages + h.reliableMsgsLock.Lock() + msgs := h.reliableMsgs + h.reliableMsgs = nil + h.reliableMsgsLock.Unlock() + + // fire all the reliable messages + for _, msg := range msgs { + h.metrics.pending.Dec() + h.dispatchMsg(msg) } case msg := <-h.msgChan: - if closing { - log.Debug("dropping internal message due to closing:\n%s", msg) - continue - } - if h.dispatchMsg(message{messageType: notifyMsg, notification: msg}) { - closing = true - } + // handle a message from the VM + h.dispatchMsg(message{messageType: notifyMsg, notification: msg}) } - if closing && h.toClose != nil { + if h.closing && h.toClose != nil { go h.toClose() } } @@ -85,14 +96,19 @@ func (h *Handler) Dispatch() { // Dispatch a message to the consensus engine. // Returns true iff this consensus handler (and its associated engine) should shutdown // (due to receipt of a shutdown message) -func (h *Handler) dispatchMsg(msg message) bool { +func (h *Handler) dispatchMsg(msg message) { + if h.closing { + h.ctx.Log.Debug("dropping message due to closing:\n%s", msg) + h.metrics.dropped.Inc() + return + } + startTime := time.Now() - ctx := h.engine.Context() - ctx.Lock.Lock() - defer ctx.Lock.Unlock() + h.ctx.Lock.Lock() + defer h.ctx.Lock.Unlock() - ctx.Log.Verbo("Forwarding message to consensus: %s", msg) + h.ctx.Log.Verbo("Forwarding message to consensus: %s", msg) var ( err error done bool @@ -159,9 +175,10 @@ func (h *Handler) dispatchMsg(msg message) bool { } if err != nil { - ctx.Log.Fatal("forcing chain to shutdown due to %s", err) + h.ctx.Log.Fatal("forcing chain to shutdown due to %s", err) } - return done || err != nil + + h.closing = done || err != nil } // GetAcceptedFrontier passes a GetAcceptedFrontier message received from the @@ -187,8 +204,8 @@ func (h *Handler) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, co // GetAcceptedFrontierFailed passes a GetAcceptedFrontierFailed message received // from the network to the consensus engine. -func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) bool { - return h.sendMsg(message{ +func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) { + h.sendReliableMsg(message{ messageType: getAcceptedFrontierFailedMsg, validatorID: validatorID, requestID: requestID, @@ -219,14 +236,43 @@ func (h *Handler) Accepted(validatorID ids.ShortID, requestID uint32, containerI // GetAcceptedFailed passes a GetAcceptedFailed message received from the // network to the consensus engine. -func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) bool { - return h.sendMsg(message{ +func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) { + h.sendReliableMsg(message{ messageType: getAcceptedFailedMsg, validatorID: validatorID, requestID: requestID, }) } +// GetAncestors passes a GetAncestors message received from the network to the consensus engine. +func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool { + return h.sendMsg(message{ + messageType: getAncestorsMsg, + validatorID: validatorID, + requestID: requestID, + containerID: containerID, + }) +} + +// MultiPut passes a MultiPut message received from the network to the consensus engine. +func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) bool { + return h.sendMsg(message{ + messageType: multiPutMsg, + validatorID: validatorID, + requestID: requestID, + containers: containers, + }) +} + +// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine. +func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) { + h.sendReliableMsg(message{ + messageType: getAncestorsFailedMsg, + validatorID: validatorID, + requestID: requestID, + }) +} + // Get passes a Get message received from the network to the consensus engine. func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool { return h.sendMsg(message{ @@ -237,16 +283,6 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids }) } -// GetAncestors passes a GetAncestors message received from the network to the consensus engine. -func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool { - return h.sendMsg(message{ - messageType: getAncestorsMsg, - validatorID: validatorID, - requestID: requestID, - containerID: containerID, - }) -} - // Put passes a Put message received from the network to the consensus engine. func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) bool { return h.sendMsg(message{ @@ -258,34 +294,15 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids }) } -// MultiPut passes a MultiPut message received from the network to the consensus engine. -func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) bool { - return h.sendMsg(message{ - messageType: multiPutMsg, - validatorID: validatorID, - requestID: requestID, - containers: containers, - }) -} - // GetFailed passes a GetFailed message to the consensus engine. -func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) bool { - return h.sendMsg(message{ +func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) { + h.sendReliableMsg(message{ messageType: getFailedMsg, validatorID: validatorID, requestID: requestID, }) } -// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine. -func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) bool { - return h.sendMsg(message{ - messageType: getAncestorsFailedMsg, - validatorID: validatorID, - requestID: requestID, - }) -} - // PushQuery passes a PushQuery message received from the network to the consensus engine. func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID, block []byte) bool { return h.sendMsg(message{ @@ -318,8 +335,8 @@ func (h *Handler) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set } // QueryFailed passes a QueryFailed message received from the network to the consensus engine. -func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) bool { - return h.sendMsg(message{ +func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) { + h.sendReliableMsg(message{ messageType: queryFailedMsg, validatorID: validatorID, requestID: requestID, @@ -341,8 +358,9 @@ func (h *Handler) Notify(msg common.Message) bool { // Shutdown shuts down the dispatcher func (h *Handler) Shutdown() { - h.metrics.pending.Inc() - h.msgs <- message{messageType: shutdownMsg} + h.sendReliableMsg(message{ + messageType: shutdownMsg, + }) } func (h *Handler) sendMsg(msg message) bool { @@ -355,3 +373,15 @@ func (h *Handler) sendMsg(msg message) bool { return false } } + +func (h *Handler) sendReliableMsg(msg message) { + h.reliableMsgsLock.Lock() + defer h.reliableMsgsLock.Unlock() + + h.metrics.pending.Inc() + h.reliableMsgs = append(h.reliableMsgs, msg) + select { + case h.reliableMsgsSema <- struct{}{}: + default: + } +} diff --git a/snow/networking/router/subnet_router.go b/snow/networking/router/subnet_router.go index 5bf977c..731dcd6 100644 --- a/snow/networking/router/subnet_router.go +++ b/snow/networking/router/subnet_router.go @@ -122,19 +122,12 @@ func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainI sr.lock.RLock() defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) if chain, exists := sr.chains[chainID.Key()]; exists { - if !chain.GetAcceptedFrontierFailed(validatorID, requestID) { - sr.log.Debug("deferring GetAcceptedFrontier timeout due to a full queue on %s", chainID) - // Defer this call to later - sr.timeouts.Register(validatorID, chainID, requestID, func() { - sr.GetAcceptedFrontierFailed(validatorID, chainID, requestID) - }) - return - } + chain.GetAcceptedFrontierFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) } - sr.timeouts.Cancel(validatorID, chainID, requestID) } // GetAccepted routes an incoming GetAccepted request from the @@ -174,18 +167,12 @@ func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID sr.lock.RLock() defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) if chain, exists := sr.chains[chainID.Key()]; exists { - if !chain.GetAcceptedFailed(validatorID, requestID) { - sr.timeouts.Register(validatorID, chainID, requestID, func() { - sr.log.Debug("deferring GetAccepted timeout due to a full queue on %s", chainID) - sr.GetAcceptedFailed(validatorID, chainID, requestID) - }) - return - } + chain.GetAcceptedFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) } - sr.timeouts.Cancel(validatorID, chainID, requestID) } // GetAncestors routes an incoming GetAncestors message from the validator with ID [validatorID] @@ -225,18 +212,12 @@ func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.I sr.lock.RLock() defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) if chain, exists := sr.chains[chainID.Key()]; exists { - if !chain.GetAncestorsFailed(validatorID, requestID) { - sr.timeouts.Register(validatorID, chainID, requestID, func() { - sr.log.Debug("deferring GetAncestors timeout due to a full queue on %s", chainID) - sr.GetAncestorsFailed(validatorID, chainID, requestID) - }) - return - } + chain.GetAncestorsFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) } - sr.timeouts.Cancel(validatorID, chainID, requestID) } // Get routes an incoming Get request from the validator with ID [validatorID] @@ -275,18 +256,12 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques sr.lock.RLock() defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) if chain, exists := sr.chains[chainID.Key()]; exists { - if !chain.GetFailed(validatorID, requestID) { - sr.timeouts.Register(validatorID, chainID, requestID, func() { - sr.log.Debug("deferring Get timeout due to a full queue on %s", chainID) - sr.GetFailed(validatorID, chainID, requestID) - }) - return - } + chain.GetFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) } - sr.timeouts.Cancel(validatorID, chainID, requestID) } // PushQuery routes an incoming PushQuery request from the validator with ID [validatorID] @@ -337,18 +312,12 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ sr.lock.RLock() defer sr.lock.RUnlock() + sr.timeouts.Cancel(validatorID, chainID, requestID) if chain, exists := sr.chains[chainID.Key()]; exists { - if !chain.QueryFailed(validatorID, requestID) { - sr.timeouts.Register(validatorID, chainID, requestID, func() { - sr.log.Debug("deferring Query timeout due to a full queue on %s", chainID) - sr.QueryFailed(validatorID, chainID, requestID) - }) - return - } + chain.QueryFailed(validatorID, requestID) } else { sr.log.Debug("message referenced a chain, %s, this node doesn't validate", chainID) } - sr.timeouts.Cancel(validatorID, chainID, requestID) } // Shutdown shuts down this router diff --git a/snow/networking/sender/sender.go b/snow/networking/sender/sender.go index e81c5c0..92c02b8 100644 --- a/snow/networking/sender/sender.go +++ b/snow/networking/sender/sender.go @@ -31,17 +31,16 @@ func (s *Sender) Context() *snow.Context { return s.ctx } // GetAcceptedFrontier ... func (s *Sender) GetAcceptedFrontier(validatorIDs ids.ShortSet, requestID uint32) { - if validatorIDs.Contains(s.ctx.NodeID) { - validatorIDs.Remove(s.ctx.NodeID) - go s.router.GetAcceptedFrontier(s.ctx.NodeID, s.ctx.ChainID, requestID) - } - validatorList := validatorIDs.List() - for _, validatorID := range validatorList { + for _, validatorID := range validatorIDs.List() { vID := validatorID s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { s.router.GetAcceptedFrontierFailed(vID, s.ctx.ChainID, requestID) }) } + if validatorIDs.Contains(s.ctx.NodeID) { + validatorIDs.Remove(s.ctx.NodeID) + go s.router.GetAcceptedFrontier(s.ctx.NodeID, s.ctx.ChainID, requestID) + } s.sender.GetAcceptedFrontier(validatorIDs, s.ctx.ChainID, requestID) } @@ -49,24 +48,23 @@ func (s *Sender) GetAcceptedFrontier(validatorIDs ids.ShortSet, requestID uint32 func (s *Sender) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) { if validatorID.Equals(s.ctx.NodeID) { go s.router.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs) - return + } else { + s.sender.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs) } - s.sender.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs) } // GetAccepted ... func (s *Sender) GetAccepted(validatorIDs ids.ShortSet, requestID uint32, containerIDs ids.Set) { - if validatorIDs.Contains(s.ctx.NodeID) { - validatorIDs.Remove(s.ctx.NodeID) - go s.router.GetAccepted(s.ctx.NodeID, s.ctx.ChainID, requestID, containerIDs) - } - validatorList := validatorIDs.List() - for _, validatorID := range validatorList { + for _, validatorID := range validatorIDs.List() { vID := validatorID s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { s.router.GetAcceptedFailed(vID, s.ctx.ChainID, requestID) }) } + if validatorIDs.Contains(s.ctx.NodeID) { + validatorIDs.Remove(s.ctx.NodeID) + go s.router.GetAccepted(s.ctx.NodeID, s.ctx.ChainID, requestID, containerIDs) + } s.sender.GetAccepted(validatorIDs, s.ctx.ChainID, requestID, containerIDs) } @@ -74,9 +72,9 @@ func (s *Sender) GetAccepted(validatorIDs ids.ShortSet, requestID uint32, contai func (s *Sender) Accepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) { if validatorID.Equals(s.ctx.NodeID) { go s.router.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs) - return + } else { + s.sender.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs) } - s.sender.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs) } // Get sends a Get message to the consensus engine running on the specified @@ -85,6 +83,13 @@ func (s *Sender) Accepted(validatorID ids.ShortID, requestID uint32, containerID // specified container. func (s *Sender) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) { s.ctx.Log.Verbo("Sending Get to validator %s. RequestID: %d. ContainerID: %s", validatorID, requestID, containerID) + + // Sending a Get to myself will always fail + if validatorID.Equals(s.ctx.NodeID) { + go s.router.GetFailed(validatorID, s.ctx.ChainID, requestID) + return + } + // Add a timeout -- if we don't get a response before the timeout expires, // send this consensus engine a GetFailed message s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { @@ -101,6 +106,7 @@ func (s *Sender) GetAncestors(validatorID ids.ShortID, requestID uint32, contain go s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID) return } + s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID) }) @@ -130,6 +136,13 @@ func (s *Sender) MultiPut(validatorID ids.ShortID, requestID uint32, containers // their preferred frontier given the existence of the specified container. func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containerID ids.ID, container []byte) { s.ctx.Log.Verbo("Sending PushQuery to validators %v. RequestID: %d. ContainerID: %s", validatorIDs, requestID, containerID) + for _, validatorID := range validatorIDs.List() { + vID := validatorID + s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { + s.router.QueryFailed(vID, s.ctx.ChainID, requestID) + }) + } + // If one of the validators in [validatorIDs] is myself, send this message directly // to my own router rather than sending it over the network if validatorIDs.Contains(s.ctx.NodeID) { // One of the validators in [validatorIDs] was myself @@ -139,13 +152,7 @@ func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containe // If this were not a goroutine, then we would deadlock here when [handler].msgs is full go s.router.PushQuery(s.ctx.NodeID, s.ctx.ChainID, requestID, containerID, container) } - validatorList := validatorIDs.List() // Convert set to list for easier iteration - for _, validatorID := range validatorList { - vID := validatorID - s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { - s.router.QueryFailed(vID, s.ctx.ChainID, requestID) - }) - } + s.sender.PushQuery(validatorIDs, s.ctx.ChainID, requestID, containerID, container) } @@ -155,6 +162,14 @@ func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containe // their preferred frontier. func (s *Sender) PullQuery(validatorIDs ids.ShortSet, requestID uint32, containerID ids.ID) { s.ctx.Log.Verbo("Sending PullQuery. RequestID: %d. ContainerID: %s", requestID, containerID) + + for _, validatorID := range validatorIDs.List() { + vID := validatorID + s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { + s.router.QueryFailed(vID, s.ctx.ChainID, requestID) + }) + } + // If one of the validators in [validatorIDs] is myself, send this message directly // to my own router rather than sending it over the network if validatorIDs.Contains(s.ctx.NodeID) { // One of the validators in [validatorIDs] was myself @@ -164,13 +179,7 @@ func (s *Sender) PullQuery(validatorIDs ids.ShortSet, requestID uint32, containe // If this were not a goroutine, then we would deadlock when [handler].msgs is full go s.router.PullQuery(s.ctx.NodeID, s.ctx.ChainID, requestID, containerID) } - validatorList := validatorIDs.List() // Convert set to list for easier iteration - for _, validatorID := range validatorList { - vID := validatorID - s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() { - s.router.QueryFailed(vID, s.ctx.ChainID, requestID) - }) - } + s.sender.PullQuery(validatorIDs, s.ctx.ChainID, requestID, containerID) } @@ -181,9 +190,9 @@ func (s *Sender) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set) // to my own router rather than sending it over the network if validatorID.Equals(s.ctx.NodeID) { go s.router.Chits(validatorID, s.ctx.ChainID, requestID, votes) - return + } else { + s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes) } - s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes) } // Gossip the provided container diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 7760307..7c7cabc 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -4,6 +4,7 @@ package sender import ( + "math/rand" "reflect" "sync" "testing" @@ -82,3 +83,128 @@ func TestTimeout(t *testing.T) { t.Fatalf("Timeouts should have fired") } } + +func TestReliableMessages(t *testing.T) { + tm := timeout.Manager{} + tm.Initialize(50 * time.Millisecond) + go tm.Dispatch() + + chainRouter := router.ChainRouter{} + chainRouter.Initialize(logging.NoLog{}, &tm, time.Hour, time.Second) + + sender := Sender{} + sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &chainRouter, &tm) + + engine := common.EngineTest{T: t} + engine.Default(true) + + engine.ContextF = snow.DefaultContextTest + engine.GossipF = func() error { return nil } + + queriesToSend := 1000 + awaiting := make([]chan struct{}, queriesToSend) + for i := 0; i < queriesToSend; i++ { + awaiting[i] = make(chan struct{}, 1) + } + + engine.QueryFailedF = func(validatorID ids.ShortID, reqID uint32) error { + close(awaiting[int(reqID)]) + return nil + } + + handler := router.Handler{} + handler.Initialize( + &engine, + nil, + 1, + "", + prometheus.NewRegistry(), + ) + go handler.Dispatch() + + chainRouter.AddChain(&handler) + + go func() { + for i := 0; i < queriesToSend; i++ { + vdrIDs := ids.ShortSet{} + vdrIDs.Add(ids.NewShortID([20]byte{1})) + + sender.PullQuery(vdrIDs, uint32(i), ids.Empty) + time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond))) + } + }() + + go func() { + for { + chainRouter.Gossip() + time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond))) + } + }() + + for _, await := range awaiting { + _, _ = <-await + } +} + +func TestReliableMessagesToMyself(t *testing.T) { + tm := timeout.Manager{} + tm.Initialize(50 * time.Millisecond) + go tm.Dispatch() + + chainRouter := router.ChainRouter{} + chainRouter.Initialize(logging.NoLog{}, &tm, time.Hour, time.Second) + + sender := Sender{} + sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &chainRouter, &tm) + + engine := common.EngineTest{T: t} + engine.Default(false) + + engine.ContextF = snow.DefaultContextTest + engine.GossipF = func() error { return nil } + engine.CantPullQuery = false + + queriesToSend := 2 + awaiting := make([]chan struct{}, queriesToSend) + for i := 0; i < queriesToSend; i++ { + awaiting[i] = make(chan struct{}, 1) + } + + engine.QueryFailedF = func(validatorID ids.ShortID, reqID uint32) error { + close(awaiting[int(reqID)]) + return nil + } + + handler := router.Handler{} + handler.Initialize( + &engine, + nil, + 1, + "", + prometheus.NewRegistry(), + ) + go handler.Dispatch() + + chainRouter.AddChain(&handler) + + go func() { + for i := 0; i < queriesToSend; i++ { + vdrIDs := ids.ShortSet{} + vdrIDs.Add(engine.Context().NodeID) + + sender.PullQuery(vdrIDs, uint32(i), ids.Empty) + time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond))) + } + }() + + go func() { + for { + chainRouter.Gossip() + time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond))) + } + }() + + for _, await := range awaiting { + _, _ = <-await + } +} From 960377e2b4227df17bdfdfca62c934c8598a1f38 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Thu, 11 Jun 2020 18:08:42 -0400 Subject: [PATCH 05/15] cleaned up imports --- snow/networking/router/handler.go | 3 ++- snow/networking/sender/sender_test.go | 3 ++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/snow/networking/router/handler.go b/snow/networking/router/handler.go index 3f1e21f..9d45baf 100644 --- a/snow/networking/router/handler.go +++ b/snow/networking/router/handler.go @@ -7,10 +7,11 @@ import ( "sync" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" - "github.com/prometheus/client_golang/prometheus" ) // Handler passes incoming messages from the network to the consensus engine diff --git a/snow/networking/sender/sender_test.go b/snow/networking/sender/sender_test.go index 7c7cabc..8be5e99 100644 --- a/snow/networking/sender/sender_test.go +++ b/snow/networking/sender/sender_test.go @@ -10,13 +10,14 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/snow" "github.com/ava-labs/gecko/snow/engine/common" "github.com/ava-labs/gecko/snow/networking/router" "github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/utils/logging" - "github.com/prometheus/client_golang/prometheus" ) func TestSenderContext(t *testing.T) { From 0fdddae9fcb290a66a9843d6cd857ba3f2646dbb Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Tue, 16 Jun 2020 11:53:57 -0400 Subject: [PATCH 06/15] Optimize DAG traversal in insertFrom --- snow/engine/avalanche/transitive.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index e48b167..82ee859 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -335,10 +335,10 @@ func (t *Transitive) reinsertFrom(vdr ids.ShortID, vtxID ids.ID) (bool, error) { func (t *Transitive) insertFrom(vdr ids.ShortID, vtx avalanche.Vertex) (bool, error) { issued := true - vts := []avalanche.Vertex{vtx} - for len(vts) > 0 { - vtx := vts[0] - vts = vts[1:] + vertexHeap := newMaxVertexHeap() + vertexHeap.Push(vtx) + for vertexHeap.Len() > 0 { + vtx := vertexHeap.Pop() if t.Consensus.VertexIssued(vtx) { continue @@ -353,7 +353,7 @@ func (t *Transitive) insertFrom(vdr ids.ShortID, vtx avalanche.Vertex) (bool, er t.sendRequest(vdr, parent.ID()) issued = false } else { - vts = append(vts, parent) + vertexHeap.Push(parent) } } From 8fdeef5eb64f4459ec5f3baf65ccc0e4c90a4aa8 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 16 Jun 2020 15:17:13 -0400 Subject: [PATCH 07/15] pre-allocate slices for List in set, bag, shortSet --- ids/bag.go | 17 ++++++++++--- ids/bag_benchmark_test.go | 53 +++++++++++++++++++++++++++++++++++++++ ids/bag_test.go | 4 +-- ids/set.go | 14 +++++++++-- ids/set_benchmark_test.go | 53 +++++++++++++++++++++++++++++++++++++++ ids/short_set.go | 13 ++++++++-- 6 files changed, 144 insertions(+), 10 deletions(-) create mode 100644 ids/bag_benchmark_test.go create mode 100644 ids/set_benchmark_test.go diff --git a/ids/bag.go b/ids/bag.go index de0af46..1d16c64 100644 --- a/ids/bag.go +++ b/ids/bag.go @@ -8,6 +8,10 @@ import ( "strings" ) +const ( + minBagSize = 16 +) + // Bag is a multiset of IDs. // // A bag has the ability to split and filter on it's bits for ease of use for @@ -25,7 +29,7 @@ type Bag struct { func (b *Bag) init() { if b.counts == nil { - b.counts = make(map[[32]byte]int) + b.counts = make(map[[32]byte]int, minBagSize) } } @@ -72,16 +76,21 @@ func (b *Bag) AddCount(id ID, count int) { } // Count returns the number of times the id has been added. -func (b *Bag) Count(id ID) int { return b.counts[*id.ID] } +func (b *Bag) Count(id ID) int { + b.init() + return b.counts[*id.ID] +} // Len returns the number of times an id has been added. func (b *Bag) Len() int { return b.size } // List returns a list of all ids that have been added. func (b *Bag) List() []ID { - idList := []ID(nil) + idList := make([]ID, len(b.counts), len(b.counts)) + i := 0 for id := range b.counts { - idList = append(idList, NewID(id)) + idList[i] = NewID(id) + i++ } return idList } diff --git a/ids/bag_benchmark_test.go b/ids/bag_benchmark_test.go new file mode 100644 index 0000000..e856505 --- /dev/null +++ b/ids/bag_benchmark_test.go @@ -0,0 +1,53 @@ +package ids + +import ( + "crypto/rand" + "testing" +) + +// +func BenchmarkBagListSmall(b *testing.B) { + smallLen := 5 + bag := Bag{} + for i := 0; i < smallLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + NewID(idBytes) + bag.Add(NewID(idBytes)) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + bag.List() + } +} + +func BenchmarkBagListMedium(b *testing.B) { + mediumLen := 25 + bag := Bag{} + for i := 0; i < mediumLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + NewID(idBytes) + bag.Add(NewID(idBytes)) + } + b.ResetTimer() + + for n := 0; n < b.N; n++ { + bag.List() + } +} + +func BenchmarkBagListLarsge(b *testing.B) { + largeLen := 100000 + bag := Bag{} + for i := 0; i < largeLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + NewID(idBytes) + bag.Add(NewID(idBytes)) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + bag.List() + } +} diff --git a/ids/bag_test.go b/ids/bag_test.go index ed35233..af0965b 100644 --- a/ids/bag_test.go +++ b/ids/bag_test.go @@ -18,8 +18,8 @@ func TestBagAdd(t *testing.T) { } else if count := bag.Count(id1); count != 0 { t.Fatalf("Bag.Count returned %d expected %d", count, 0) } else if size := bag.Len(); size != 0 { - t.Fatalf("Bag.Len returned %d expected %d", count, 0) - } else if list := bag.List(); list != nil { + t.Fatalf("Bag.Len returned %d elements expected %d", count, 0) + } else if list := bag.List(); len(list) != 0 { t.Fatalf("Bag.List returned %v expected %v", list, nil) } else if mode, freq := bag.Mode(); !mode.IsZero() { t.Fatalf("Bag.Mode[0] returned %s expected %s", mode, ID{}) diff --git a/ids/set.go b/ids/set.go index 9d0b1ec..c3aa024 100644 --- a/ids/set.go +++ b/ids/set.go @@ -7,11 +7,19 @@ import ( "strings" ) +const ( + // The minimum capacity of a set + minSetSize = 16 +) + // Set is a set of IDs type Set map[[32]byte]bool func (ids *Set) init(size int) { if *ids == nil { + if minSetSize > size { + size = minSetSize + } *ids = make(map[[32]byte]bool, size) } } @@ -70,9 +78,11 @@ func (ids *Set) Clear() { *ids = nil } // List converts this set into a list func (ids Set) List() []ID { - idList := []ID(nil) + idList := make([]ID, ids.Len(), ids.Len()) + i := 0 for id := range ids { - idList = append(idList, NewID(id)) + idList[i] = NewID(id) + i++ } return idList } diff --git a/ids/set_benchmark_test.go b/ids/set_benchmark_test.go new file mode 100644 index 0000000..c88b4f9 --- /dev/null +++ b/ids/set_benchmark_test.go @@ -0,0 +1,53 @@ +package ids + +import ( + "crypto/rand" + "testing" +) + +// +func BenchmarkSetListSmall(b *testing.B) { + smallLen := 5 + set := Set{} + for i := 0; i < smallLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + NewID(idBytes) + set.Add(NewID(idBytes)) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + set.List() + } +} + +func BenchmarkSetListMedium(b *testing.B) { + mediumLen := 25 + set := Set{} + for i := 0; i < mediumLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + NewID(idBytes) + set.Add(NewID(idBytes)) + } + b.ResetTimer() + + for n := 0; n < b.N; n++ { + set.List() + } +} + +func BenchmarkSetListLarsge(b *testing.B) { + largeLen := 100000 + set := Set{} + for i := 0; i < largeLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + NewID(idBytes) + set.Add(NewID(idBytes)) + } + b.ResetTimer() + for n := 0; n < b.N; n++ { + set.List() + } +} diff --git a/ids/short_set.go b/ids/short_set.go index 690cc3a..6977863 100644 --- a/ids/short_set.go +++ b/ids/short_set.go @@ -5,11 +5,18 @@ package ids import "strings" +const ( + minShortSetSize = 16 +) + // ShortSet is a set of ShortIDs type ShortSet map[[20]byte]bool func (ids *ShortSet) init(size int) { if *ids == nil { + if minShortSetSize > size { + size = minShortSetSize + } *ids = make(map[[20]byte]bool, size) } } @@ -65,9 +72,11 @@ func (ids ShortSet) CappedList(size int) []ShortID { // List converts this set into a list func (ids ShortSet) List() []ShortID { - idList := make([]ShortID, len(ids))[:0] + idList := make([]ShortID, len(ids), len(ids)) + i := 0 for id := range ids { - idList = append(idList, NewShortID(id)) + idList[i] = NewShortID(id) + i++ } return idList } From 191cd485935bc1b90d1bd445c248fea03af3becf Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 16 Jun 2020 15:34:34 -0400 Subject: [PATCH 08/15] add minimum map size to Blocker --- snow/events/blocker.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/snow/events/blocker.go b/snow/events/blocker.go index 6bfdd7b..3491b24 100644 --- a/snow/events/blocker.go +++ b/snow/events/blocker.go @@ -10,12 +10,16 @@ import ( "github.com/ava-labs/gecko/ids" ) +const ( + minBlockerSize = 16 +) + // Blocker tracks objects that are blocked type Blocker map[[32]byte][]Blockable func (b *Blocker) init() { if *b == nil { - *b = make(map[[32]byte][]Blockable) + *b = make(map[[32]byte][]Blockable, minBlockerSize) } } From 4ecd92efba77399ce0c5637ccbe56b3a845fa5d2 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 16 Jun 2020 15:43:30 -0400 Subject: [PATCH 09/15] add minimum size to uniqueBag and Requests --- ids/unique_bag.go | 6 +++++- snow/engine/common/requests.go | 8 ++++++-- 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/ids/unique_bag.go b/ids/unique_bag.go index d5d3e36..461a6bc 100644 --- a/ids/unique_bag.go +++ b/ids/unique_bag.go @@ -8,12 +8,16 @@ import ( "strings" ) +const ( + minUniqueBagSize = 16 +) + // UniqueBag ... type UniqueBag map[[32]byte]BitSet func (b *UniqueBag) init() { if *b == nil { - *b = make(map[[32]byte]BitSet) + *b = make(map[[32]byte]BitSet, minUniqueBagSize) } } diff --git a/snow/engine/common/requests.go b/snow/engine/common/requests.go index 22d5759..51f6cb3 100644 --- a/snow/engine/common/requests.go +++ b/snow/engine/common/requests.go @@ -7,6 +7,10 @@ import ( "github.com/ava-labs/gecko/ids" ) +const ( + minRequestsSize = 32 +) + type req struct { vdr ids.ShortID id uint32 @@ -22,7 +26,7 @@ type Requests struct { // are only in one request at a time. func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) { if r.reqsToID == nil { - r.reqsToID = make(map[[20]byte]map[uint32]ids.ID) + r.reqsToID = make(map[[20]byte]map[uint32]ids.ID, minRequestsSize) } vdrKey := vdr.Key() vdrReqs, ok := r.reqsToID[vdrKey] @@ -33,7 +37,7 @@ func (r *Requests) Add(vdr ids.ShortID, requestID uint32, containerID ids.ID) { vdrReqs[requestID] = containerID if r.idToReq == nil { - r.idToReq = make(map[[32]byte]req) + r.idToReq = make(map[[32]byte]req, minRequestsSize) } r.idToReq[containerID.Key()] = req{ vdr: vdr, From 77d24022fefab4a7d8966cd69004e4b2dd1f6c85 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 16 Jun 2020 16:11:21 -0400 Subject: [PATCH 10/15] add minimumCacheSize --- cache/lru_cache.go | 8 +++-- cache/lru_cache_benchmark_test.go | 53 +++++++++++++++++++++++++++++++ 2 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 cache/lru_cache_benchmark_test.go diff --git a/cache/lru_cache.go b/cache/lru_cache.go index 629d6bd..ae04138 100644 --- a/cache/lru_cache.go +++ b/cache/lru_cache.go @@ -10,6 +10,10 @@ import ( "github.com/ava-labs/gecko/ids" ) +const ( + minCacheSize = 32 +) + type entry struct { Key ids.ID Value interface{} @@ -59,7 +63,7 @@ func (c *LRU) Flush() { func (c *LRU) init() { if c.entryMap == nil { - c.entryMap = make(map[[32]byte]*list.Element) + c.entryMap = make(map[[32]byte]*list.Element, minCacheSize) } if c.entryList == nil { c.entryList = list.New() @@ -134,6 +138,6 @@ func (c *LRU) evict(key ids.ID) { func (c *LRU) flush() { c.init() - c.entryMap = make(map[[32]byte]*list.Element) + c.entryMap = make(map[[32]byte]*list.Element, minCacheSize) c.entryList = list.New() } diff --git a/cache/lru_cache_benchmark_test.go b/cache/lru_cache_benchmark_test.go new file mode 100644 index 0000000..6bdbaf8 --- /dev/null +++ b/cache/lru_cache_benchmark_test.go @@ -0,0 +1,53 @@ +package cache + +import ( + "crypto/rand" + "testing" + + "github.com/ava-labs/gecko/ids" +) + +func BenchmarkLRUCachePutSmall(b *testing.B) { + smallLen := 5 + cache := &LRU{Size: smallLen} + for n := 0; n < b.N; n++ { + for i := 0; i < smallLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + cache.Put(ids.NewID(idBytes), n) + } + b.StopTimer() + cache.Flush() + b.StartTimer() + } +} + +func BenchmarkLRUCachePutMedium(b *testing.B) { + mediumLen := 250 + cache := &LRU{Size: mediumLen} + for n := 0; n < b.N; n++ { + for i := 0; i < mediumLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + cache.Put(ids.NewID(idBytes), n) + } + b.StopTimer() + cache.Flush() + b.StartTimer() + } +} + +func BenchmarkLRUCachePutLarge(b *testing.B) { + largeLen := 10000 + cache := &LRU{Size: largeLen} + for n := 0; n < b.N; n++ { + for i := 0; i < largeLen; i++ { + var idBytes [32]byte + rand.Read(idBytes[:]) + cache.Put(ids.NewID(idBytes), n) + } + b.StopTimer() + cache.Flush() + b.StartTimer() + } +} From 8edcb1689b6e28343638485f73a4bcc814b72dbe Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Tue, 16 Jun 2020 16:52:46 -0400 Subject: [PATCH 11/15] bump version for everest --- genesis/config.go | 118 ++++++++++++++++++++++++++++++++++++++++++ genesis/network_id.go | 6 ++- main/params.go | 2 +- node/node.go | 2 +- 4 files changed, 125 insertions(+), 3 deletions(-) diff --git a/genesis/config.go b/genesis/config.go index 7442ca0..832d694 100644 --- a/genesis/config.go +++ b/genesis/config.go @@ -50,6 +50,122 @@ func (c *Config) init() error { // Hard coded genesis constants var ( + EverestConfig = Config{ + MintAddresses: []string{ + "95YUFjhDG892VePMzpwKF9JzewGKvGRi3", + }, + FundedAddresses: []string{ + "9uKvvA7E35QCwLvAaohXTCfFejbf3Rv17", + "JLrYNMYXANGj43BfWXBxMMAEenUBp1Sbn", + "7TUTzwrU6nbZtWHjTHEpdneUvjKBxb3EM", + "77mPUXBdQKwQpPoX6rckCZGLGGdkuG1G6", + "4gGWdFZ4Gax1B466YKXyKRRpWLb42Afdt", + "CKTkzAPsRxCreyiDTnjGxLmjMarxF28fi", + "4ABm9gFHVtsNdcKSd1xsacFkGneSgzpaa", + "DpL8PTsrjtLzv5J8LL3D2A6YcnCTqrNH9", + "ZdhZv6oZrmXLyFDy6ovXAu6VxmbTsT2h", + "6cesTteH62Y5mLoDBUASaBvCXuL2AthL", + }, + StakerIDs: []string{ + "LQwRLm4cbJ7T2kxcxp4uXCU5XD8DFrE1C", + "hArafGhY2HFTbwaaVh1CSCUCUCiJ2Vfb", + "2m38qc95mhHXtrhjyGbe7r2NhniqHHJRB", + "4QBwET5o8kUhvt9xArhir4d3R25CtmZho", + "NpagUxt6KQiwPch9Sd4osv8kD1TZnkjdk", + }, + EVMBytes: []byte{ + 0x7b, 0x22, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, + 0x22, 0x3a, 0x7b, 0x22, 0x63, 0x68, 0x61, 0x69, + 0x6e, 0x49, 0x64, 0x22, 0x3a, 0x34, 0x33, 0x31, + 0x31, 0x30, 0x2c, 0x22, 0x68, 0x6f, 0x6d, 0x65, + 0x73, 0x74, 0x65, 0x61, 0x64, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22, 0x64, + 0x61, 0x6f, 0x46, 0x6f, 0x72, 0x6b, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22, + 0x64, 0x61, 0x6f, 0x46, 0x6f, 0x72, 0x6b, 0x53, + 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x22, 0x3a, + 0x74, 0x72, 0x75, 0x65, 0x2c, 0x22, 0x65, 0x69, + 0x70, 0x31, 0x35, 0x30, 0x42, 0x6c, 0x6f, 0x63, + 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22, 0x65, 0x69, + 0x70, 0x31, 0x35, 0x30, 0x48, 0x61, 0x73, 0x68, + 0x22, 0x3a, 0x22, 0x30, 0x78, 0x32, 0x30, 0x38, + 0x36, 0x37, 0x39, 0x39, 0x61, 0x65, 0x65, 0x62, + 0x65, 0x61, 0x65, 0x31, 0x33, 0x35, 0x63, 0x32, + 0x34, 0x36, 0x63, 0x36, 0x35, 0x30, 0x32, 0x31, + 0x63, 0x38, 0x32, 0x62, 0x34, 0x65, 0x31, 0x35, + 0x61, 0x32, 0x63, 0x34, 0x35, 0x31, 0x33, 0x34, + 0x30, 0x39, 0x39, 0x33, 0x61, 0x61, 0x63, 0x66, + 0x64, 0x32, 0x37, 0x35, 0x31, 0x38, 0x38, 0x36, + 0x35, 0x31, 0x34, 0x66, 0x30, 0x22, 0x2c, 0x22, + 0x65, 0x69, 0x70, 0x31, 0x35, 0x35, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22, + 0x65, 0x69, 0x70, 0x31, 0x35, 0x38, 0x42, 0x6c, + 0x6f, 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x2c, 0x22, + 0x62, 0x79, 0x7a, 0x61, 0x6e, 0x74, 0x69, 0x75, + 0x6d, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x3a, + 0x30, 0x2c, 0x22, 0x63, 0x6f, 0x6e, 0x73, 0x74, + 0x61, 0x6e, 0x74, 0x69, 0x6e, 0x6f, 0x70, 0x6c, + 0x65, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x22, 0x3a, + 0x30, 0x2c, 0x22, 0x70, 0x65, 0x74, 0x65, 0x72, + 0x73, 0x62, 0x75, 0x72, 0x67, 0x42, 0x6c, 0x6f, + 0x63, 0x6b, 0x22, 0x3a, 0x30, 0x7d, 0x2c, 0x22, + 0x6e, 0x6f, 0x6e, 0x63, 0x65, 0x22, 0x3a, 0x22, + 0x30, 0x78, 0x30, 0x22, 0x2c, 0x22, 0x74, 0x69, + 0x6d, 0x65, 0x73, 0x74, 0x61, 0x6d, 0x70, 0x22, + 0x3a, 0x22, 0x30, 0x78, 0x30, 0x22, 0x2c, 0x22, + 0x65, 0x78, 0x74, 0x72, 0x61, 0x44, 0x61, 0x74, + 0x61, 0x22, 0x3a, 0x22, 0x30, 0x78, 0x30, 0x30, + 0x22, 0x2c, 0x22, 0x67, 0x61, 0x73, 0x4c, 0x69, + 0x6d, 0x69, 0x74, 0x22, 0x3a, 0x22, 0x30, 0x78, + 0x35, 0x66, 0x35, 0x65, 0x31, 0x30, 0x30, 0x22, + 0x2c, 0x22, 0x64, 0x69, 0x66, 0x66, 0x69, 0x63, + 0x75, 0x6c, 0x74, 0x79, 0x22, 0x3a, 0x22, 0x30, + 0x78, 0x30, 0x22, 0x2c, 0x22, 0x6d, 0x69, 0x78, + 0x48, 0x61, 0x73, 0x68, 0x22, 0x3a, 0x22, 0x30, + 0x78, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x22, 0x2c, 0x22, 0x63, 0x6f, 0x69, 0x6e, + 0x62, 0x61, 0x73, 0x65, 0x22, 0x3a, 0x22, 0x30, + 0x78, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x22, 0x2c, 0x22, 0x61, 0x6c, 0x6c, 0x6f, + 0x63, 0x22, 0x3a, 0x7b, 0x22, 0x35, 0x37, 0x32, + 0x66, 0x34, 0x64, 0x38, 0x30, 0x66, 0x31, 0x30, + 0x66, 0x36, 0x36, 0x33, 0x62, 0x35, 0x30, 0x34, + 0x39, 0x66, 0x37, 0x38, 0x39, 0x35, 0x34, 0x36, + 0x66, 0x32, 0x35, 0x66, 0x37, 0x30, 0x62, 0x62, + 0x36, 0x32, 0x61, 0x37, 0x66, 0x22, 0x3a, 0x7b, + 0x22, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x65, + 0x22, 0x3a, 0x22, 0x30, 0x78, 0x33, 0x33, 0x62, + 0x32, 0x65, 0x33, 0x63, 0x39, 0x66, 0x64, 0x30, + 0x38, 0x30, 0x34, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x22, 0x7d, 0x7d, 0x2c, + 0x22, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, + 0x3a, 0x22, 0x30, 0x78, 0x30, 0x22, 0x2c, 0x22, + 0x67, 0x61, 0x73, 0x55, 0x73, 0x65, 0x64, 0x22, + 0x3a, 0x22, 0x30, 0x78, 0x30, 0x22, 0x2c, 0x22, + 0x70, 0x61, 0x72, 0x65, 0x6e, 0x74, 0x48, 0x61, + 0x73, 0x68, 0x22, 0x3a, 0x22, 0x30, 0x78, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, + 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x30, 0x22, + 0x7d, + }, + } DenaliConfig = Config{ MintAddresses: []string{ "95YUFjhDG892VePMzpwKF9JzewGKvGRi3", @@ -393,6 +509,8 @@ var ( // GetConfig ... func GetConfig(networkID uint32) *Config { switch networkID { + case EverestID: + return &EverestConfig case DenaliID: return &DenaliConfig case CascadeID: diff --git a/genesis/network_id.go b/genesis/network_id.go index 7be7968..f318a36 100644 --- a/genesis/network_id.go +++ b/genesis/network_id.go @@ -16,13 +16,15 @@ var ( MainnetID uint32 = 1 CascadeID uint32 = 2 DenaliID uint32 = 3 + EverestID uint32 = 4 - TestnetID uint32 = 3 + TestnetID uint32 = 4 LocalID uint32 = 12345 MainnetName = "mainnet" CascadeName = "cascade" DenaliName = "denali" + EverestName = "everest" TestnetName = "testnet" LocalName = "local" @@ -31,6 +33,7 @@ var ( MainnetID: MainnetName, CascadeID: CascadeName, DenaliID: DenaliName, + EverestID: EverestName, LocalID: LocalName, } @@ -38,6 +41,7 @@ var ( MainnetName: MainnetID, CascadeName: CascadeID, DenaliName: DenaliID, + EverestName: EverestID, TestnetName: TestnetID, LocalName: LocalID, diff --git a/main/params.go b/main/params.go index 6dcad06..94db609 100644 --- a/main/params.go +++ b/main/params.go @@ -30,7 +30,7 @@ import ( ) const ( - dbVersion = "v0.5.0" + dbVersion = "v0.6.0" ) // Results of parsing the CLI diff --git a/node/node.go b/node/node.go index 5e817fa..942a746 100644 --- a/node/node.go +++ b/node/node.go @@ -56,7 +56,7 @@ var ( genesisHashKey = []byte("genesisID") // Version is the version of this code - Version = version.NewDefaultVersion("avalanche", 0, 5, 5) + Version = version.NewDefaultVersion("avalanche", 0, 6, 0) versionParser = version.NewDefaultParser() ) From aca163714d1dd076fce3fee0f19f350b7487cbed Mon Sep 17 00:00:00 2001 From: Stephen Buttolph Date: Tue, 16 Jun 2020 17:21:23 -0400 Subject: [PATCH 12/15] fixed typo --- ids/bag_benchmark_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ids/bag_benchmark_test.go b/ids/bag_benchmark_test.go index e856505..7007ddb 100644 --- a/ids/bag_benchmark_test.go +++ b/ids/bag_benchmark_test.go @@ -37,7 +37,7 @@ func BenchmarkBagListMedium(b *testing.B) { } } -func BenchmarkBagListLarsge(b *testing.B) { +func BenchmarkBagListLarge(b *testing.B) { largeLen := 100000 bag := Bag{} for i := 0; i < largeLen; i++ { From e0d00e25c720119b46fc9f3001819a19bdef4c47 Mon Sep 17 00:00:00 2001 From: Dan Laine Date: Tue, 16 Jun 2020 17:23:48 -0400 Subject: [PATCH 13/15] fix typo --- ids/set_benchmark_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ids/set_benchmark_test.go b/ids/set_benchmark_test.go index c88b4f9..17c1c7a 100644 --- a/ids/set_benchmark_test.go +++ b/ids/set_benchmark_test.go @@ -37,7 +37,7 @@ func BenchmarkSetListMedium(b *testing.B) { } } -func BenchmarkSetListLarsge(b *testing.B) { +func BenchmarkSetListLarge(b *testing.B) { largeLen := 100000 set := Set{} for i := 0; i < largeLen; i++ { From aab8f5f3d46b17b0377de00dfbd42095b9c1d9ad Mon Sep 17 00:00:00 2001 From: Aaron Buchwald Date: Tue, 16 Jun 2020 16:28:58 -0400 Subject: [PATCH 14/15] Implement early termination case for avalanche polling --- snow/engine/avalanche/polls.go | 41 +++++++++++- snow/engine/avalanche/polls_test.go | 99 +++++++++++++++++++++++++++++ snow/engine/avalanche/transitive.go | 4 +- 3 files changed, 140 insertions(+), 4 deletions(-) create mode 100644 snow/engine/avalanche/polls_test.go diff --git a/snow/engine/avalanche/polls.go b/snow/engine/avalanche/polls.go index fa1e7df..3bf0498 100644 --- a/snow/engine/avalanche/polls.go +++ b/snow/engine/avalanche/polls.go @@ -32,9 +32,19 @@ import ( type polls struct { log logging.Logger numPolls prometheus.Gauge + alpha int m map[uint32]poll } +func newPolls(alpha int, log logging.Logger, numPolls prometheus.Gauge) polls { + return polls{ + log: log, + numPolls: numPolls, + alpha: alpha, + m: make(map[uint32]poll), + } +} + // Add to the current set of polls // Returns true if the poll was registered correctly and the network sample // should be made. @@ -42,6 +52,7 @@ func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool { poll, exists := p.m[requestID] if !exists { poll.polled = vdrs + poll.alpha = p.alpha p.m[requestID] = poll p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics @@ -85,6 +96,7 @@ func (p *polls) String() string { type poll struct { votes ids.UniqueBag polled ids.ShortSet + alpha int } // Vote registers a vote for this poll @@ -97,5 +109,32 @@ func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) { // Finished returns true if the poll has completed, with no more required // responses -func (p poll) Finished() bool { return p.polled.Len() == 0 } +func (p poll) Finished() bool { + // If I have no outstanding polls, I'm finished + numPending := p.polled.Len() + if numPending == 0 { + return true + } + // If there are still enough pending responses to include another vertex, + // then I can't stop polling + if numPending > p.alpha { + return false + } + + // I ignore any vertex that has already received alpha votes. + // To safely skip DAG traversal, assume that all votes for + // vertices with less than alpha votes will be applied to a + // single shared ancestor. + // In this case, I can terminate early, iff there are not enough + // pending votes for this ancestor to receive alpha votes. + partialVotes := ids.BitSet(0) + for _, vote := range p.votes.List() { + voters := p.votes.GetSet(vote) + if voters.Len() >= p.alpha { + continue + } + partialVotes.Union(voters) + } + return partialVotes.Len()+numPending < p.alpha +} func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.polled.Len()) } diff --git a/snow/engine/avalanche/polls_test.go b/snow/engine/avalanche/polls_test.go new file mode 100644 index 0000000..cbb1ea4 --- /dev/null +++ b/snow/engine/avalanche/polls_test.go @@ -0,0 +1,99 @@ +package avalanche + +import ( + "testing" + + "github.com/ava-labs/gecko/ids" +) + +func TestPollTerminatesEarlyVirtuousCase(t *testing.T) { + alpha := 3 + + vtxID := GenerateID() + votes := []ids.ID{vtxID} + + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) + vdr4 := ids.NewShortID([20]byte{4}) + vdr5 := ids.NewShortID([20]byte{5}) // k = 5 + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + vdrs.Add(vdr2) + vdrs.Add(vdr3) + vdrs.Add(vdr4) + vdrs.Add(vdr5) + + poll := poll{ + votes: make(ids.UniqueBag), + polled: vdrs, + alpha: alpha, + } + + poll.Vote(votes, vdr1) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(votes, vdr2) + if poll.Finished() { + t.Fatalf("Poll finished after less than alpha votes") + } + poll.Vote(votes, vdr3) + if !poll.Finished() { + t.Fatalf("Poll did not terminate early after receiving alpha votes for one vertex and none for other vertices") + } +} + +func TestPollAccountsForSharedAncestor(t *testing.T) { + alpha := 4 + + vtxA := GenerateID() + vtxB := GenerateID() + vtxC := GenerateID() + vtxD := GenerateID() + + // If validators 1-3 vote for frontier vertices + // B, C, and D respectively, which all share the common ancestor + // A, then we cannot terminate early with alpha = k = 4 + // If the final vote is cast for any of A, B, C, or D, then + // vertex A will have transitively received alpha = 4 votes + vdr1 := ids.NewShortID([20]byte{1}) + vdr2 := ids.NewShortID([20]byte{2}) + vdr3 := ids.NewShortID([20]byte{3}) + vdr4 := ids.NewShortID([20]byte{4}) + + vdrs := ids.ShortSet{} + vdrs.Add(vdr1) + vdrs.Add(vdr2) + vdrs.Add(vdr3) + vdrs.Add(vdr4) + + poll := poll{ + votes: make(ids.UniqueBag), + polled: vdrs, + alpha: alpha, + } + + votes1 := []ids.ID{vtxB} + poll.Vote(votes1, vdr1) + if poll.Finished() { + t.Fatalf("Poll finished early after receiving one vote") + } + votes2 := []ids.ID{vtxC} + poll.Vote(votes2, vdr2) + if poll.Finished() { + t.Fatalf("Poll finished early after receiving two votes") + } + votes3 := []ids.ID{vtxD} + poll.Vote(votes3, vdr3) + if poll.Finished() { + t.Fatalf("Poll terminated early, when a shared ancestor could have received alpha votes") + } + + votes4 := []ids.ID{vtxA} + poll.Vote(votes4, vdr4) + if !poll.Finished() { + t.Fatalf("Poll did not terminate after receiving all outstanding votes") + } +} diff --git a/snow/engine/avalanche/transitive.go b/snow/engine/avalanche/transitive.go index 71fbbe0..966e2e5 100644 --- a/snow/engine/avalanche/transitive.go +++ b/snow/engine/avalanche/transitive.go @@ -57,9 +57,7 @@ func (t *Transitive) Initialize(config Config) error { t.onFinished = t.finishBootstrapping - t.polls.log = config.Context.Log - t.polls.numPolls = t.numPolls - t.polls.m = make(map[uint32]poll) + t.polls = newPolls(int(config.Alpha), config.Context.Log, t.numPolls) return t.bootstrapper.Initialize(config.BootstrapConfig) } From 3d6fff70e072252e101bba491835bfd346821944 Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Tue, 16 Jun 2020 23:53:19 -0400 Subject: [PATCH 15/15] nits to clean up the PR --- snow/engine/avalanche/polls.go | 21 +++++++++------------ 1 file changed, 9 insertions(+), 12 deletions(-) diff --git a/snow/engine/avalanche/polls.go b/snow/engine/avalanche/polls.go index 3bf0498..ac3fe5c 100644 --- a/snow/engine/avalanche/polls.go +++ b/snow/engine/avalanche/polls.go @@ -110,30 +110,27 @@ func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) { // Finished returns true if the poll has completed, with no more required // responses func (p poll) Finished() bool { - // If I have no outstanding polls, I'm finished + // If there are no outstanding queries, the poll is finished numPending := p.polled.Len() if numPending == 0 { return true } // If there are still enough pending responses to include another vertex, - // then I can't stop polling + // then the poll must wait for more responses if numPending > p.alpha { return false } - // I ignore any vertex that has already received alpha votes. - // To safely skip DAG traversal, assume that all votes for - // vertices with less than alpha votes will be applied to a - // single shared ancestor. - // In this case, I can terminate early, iff there are not enough - // pending votes for this ancestor to receive alpha votes. + // Ignore any vertex that has already received alpha votes. To safely skip + // DAG traversal, assume that all votes for vertices with less than alpha + // votes will be applied to a single shared ancestor. In this case, the poll + // can terminate early, iff there are not enough pending votes for this + // ancestor to receive alpha votes. partialVotes := ids.BitSet(0) for _, vote := range p.votes.List() { - voters := p.votes.GetSet(vote) - if voters.Len() >= p.alpha { - continue + if voters := p.votes.GetSet(vote); voters.Len() < p.alpha { + partialVotes.Union(voters) } - partialVotes.Union(voters) } return partialVotes.Len()+numPending < p.alpha }