Merge branch 'master' into set-cache

This commit is contained in:
Stephen Buttolph 2020-06-16 16:28:31 -04:00 committed by GitHub
commit 89ab9fce4b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
17 changed files with 690 additions and 197 deletions

1
go.mod
View File

@ -33,6 +33,7 @@ require (
github.com/olekukonko/tablewriter v0.0.4 // indirect
github.com/pborman/uuid v1.2.0 // indirect
github.com/prometheus/client_golang v1.6.0
github.com/prometheus/common v0.9.1
github.com/prometheus/tsdb v0.10.0 // indirect
github.com/rjeczalik/notify v0.9.2 // indirect
github.com/rs/cors v1.7.0

4
go.sum
View File

@ -8,8 +8,10 @@ github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAE
github.com/Shopify/sarama v1.26.1/go.mod h1:NbSGBSSndYaIhRcBtY9V0U7AyH+x71bG668AuWys/yU=
github.com/Shopify/toxiproxy v2.1.4+incompatible/go.mod h1:OXgGpZ6Cli1/URJOF1DMxUHB2q5Ap20/P/eIdh4G0pI=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751 h1:JYp7IbQjafoB+tBA3gMyHYHrpOtNuDiK/uB5uXxq5wM=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4 h1:Hs82Z41s6SdL1CELW+XaDYmOH4hkBN4/N9og/AsOv7E=
github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/allegro/bigcache v1.2.1 h1:hg1sY1raCwic3Vnsvje6TT7/pnZba83LeFck5NrFKSc=
github.com/allegro/bigcache v1.2.1/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
@ -218,6 +220,7 @@ github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik=
github.com/rs/cors v1.7.0/go.mod h1:gFx+x8UowdsKA9AchylcLynDq+nNFfI8FkUZdN/jGCU=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo=
github.com/sirupsen/logrus v1.4.2 h1:SPIRibHv4MatM3XXNO2BJeFLZwZ2LvZgfQ5+UNI2im4=
github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE=
github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA=
github.com/status-im/keycard-go v0.0.0-20200402102358-957c09536969 h1:Oo2KZNP70KE0+IUJSidPj/BFS/RXNHmKIJOdckzml2E=
@ -337,6 +340,7 @@ google.golang.org/protobuf v1.20.1-0.20200309200217-e05f789c0967/go.mod h1:A+miE
google.golang.org/protobuf v1.21.0/go.mod h1:47Nbq4nVaFHyn7ilMalzfO3qCViNmqZ2kzikPIcrTAo=
google.golang.org/protobuf v1.23.0 h1:4MY060fB1DLGMB/7MBTLnwQUY6+F09GEiz6SsrNqyzM=
google.golang.org/protobuf v1.23.0/go.mod h1:EGpADcykh3NcUnDUJcl1+ZksZNG86OlYog2l/sGQquU=
gopkg.in/alecthomas/kingpin.v2 v2.2.6 h1:jMFz6MfLP0/4fUyZle81rXUoxOBFi19VUFKVDOQfozc=
gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw=
gopkg.in/bsm/ratelimit.v1 v1.0.0-20160220154919-db14e161995a/go.mod h1:KF9sEfUPAXdG8Oev9e99iLGnl2uJMjc5B+4y3O7x610=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=

View File

@ -231,6 +231,7 @@ func (ta *Topological) pushVotes(
kahnNodes map[[32]byte]kahnNode,
leaves []ids.ID) ids.Bag {
votes := make(ids.UniqueBag)
txConflicts := make(map[[32]byte]ids.Set)
for len(leaves) > 0 {
newLeavesSize := len(leaves) - 1
@ -245,6 +246,12 @@ func (ta *Topological) pushVotes(
// Give the votes to the consumer
txID := tx.ID()
votes.UnionSet(txID, kahn.votes)
// Map txID to set of Conflicts
txKey := txID.Key()
if _, exists := txConflicts[txKey]; !exists {
txConflicts[txKey] = ta.cg.Conflicts(tx)
}
}
for _, dep := range vtx.Parents() {
@ -265,6 +272,18 @@ func (ta *Topological) pushVotes(
}
}
// Create bag of votes for conflicting transactions
conflictingVotes := make(ids.UniqueBag)
for txHash, conflicts := range txConflicts {
txID := ids.NewID(txHash)
for conflictTxHash := range conflicts {
conflictTxID := ids.NewID(conflictTxHash)
conflictingVotes.UnionSet(txID, votes.GetSet(conflictTxID))
}
}
votes.Difference(&conflictingVotes)
return votes.Bag(ta.params.Alpha)
}

View File

@ -104,6 +104,78 @@ func TestAvalancheVoting(t *testing.T) {
}
}
func TestAvalancheIgnoreInvalidVoting(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 3,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 1,
},
Parents: 2,
BatchSize: 1,
}
vts := []Vertex{&Vtx{
id: GenerateID(),
status: choices.Accepted,
}, &Vtx{
id: GenerateID(),
status: choices.Accepted,
}}
utxos := []ids.ID{GenerateID()}
ta := Topological{}
ta.Initialize(snow.DefaultContextTest(), params, vts)
tx0 := &snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
}
tx0.Ins.Add(utxos[0])
vtx0 := &Vtx{
dependencies: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx0},
height: 1,
status: choices.Processing,
}
tx1 := &snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
}
tx1.Ins.Add(utxos[0])
vtx1 := &Vtx{
dependencies: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx1},
height: 1,
status: choices.Processing,
}
ta.Add(vtx0)
ta.Add(vtx1)
sm := make(ids.UniqueBag)
sm.Add(0, vtx0.id)
sm.Add(1, vtx1.id)
// Add Illegal Vote cast by Response 2
sm.Add(2, vtx0.id)
sm.Add(2, vtx1.id)
ta.RecordPoll(sm)
if ta.Finalized() {
t.Fatalf("An avalanche instance finalized too early")
}
}
func TestAvalancheTransitiveVoting(t *testing.T) {
params := Parameters{
Parameters: snowball.Parameters{

View File

@ -27,11 +27,13 @@ func (sb *unarySnowball) Extend(beta int, choice int) BinarySnowball {
bs := &binarySnowball{
binarySnowflake: binarySnowflake{
binarySlush: binarySlush{preference: choice},
confidence: sb.confidence,
beta: beta,
finalized: sb.Finalized(),
},
preference: choice,
}
bs.numSuccessfulPolls[choice] = sb.numSuccessfulPolls
return bs
}

View File

@ -42,11 +42,32 @@ func TestUnarySnowball(t *testing.T) {
binarySnowball := sbClone.Extend(beta, 0)
expected := "SB(Preference = 0, NumSuccessfulPolls[0] = 2, NumSuccessfulPolls[1] = 0, SF(Confidence = 1, Finalized = false, SL(Preference = 0)))"
if result := binarySnowball.String(); result != expected {
t.Fatalf("Expected:\n%s\nReturned:\n%s", expected, result)
}
binarySnowball.RecordUnsuccessfulPoll()
for i := 0; i < 3; i++ {
if binarySnowball.Preference() != 0 {
t.Fatalf("Wrong preference")
} else if binarySnowball.Finalized() {
t.Fatalf("Should not have finalized")
}
binarySnowball.RecordSuccessfulPoll(1)
binarySnowball.RecordUnsuccessfulPoll()
}
if binarySnowball.Preference() != 1 {
t.Fatalf("Wrong preference")
} else if binarySnowball.Finalized() {
t.Fatalf("Should not have finalized")
}
binarySnowball.RecordSuccessfulPoll(1)
if binarySnowball.Finalized() {
if binarySnowball.Preference() != 1 {
t.Fatalf("Wrong preference")
} else if binarySnowball.Finalized() {
t.Fatalf("Should not have finalized")
}
@ -57,4 +78,9 @@ func TestUnarySnowball(t *testing.T) {
} else if !binarySnowball.Finalized() {
t.Fatalf("Should have finalized")
}
expected = "SB(NumSuccessfulPolls = 2, SF(Confidence = 1, Finalized = false))"
if str := sb.String(); str != expected {
t.Fatalf("Wrong state. Expected:\n%s\nGot:\n%s", expected, str)
}
}

View File

@ -78,9 +78,12 @@ func (i *issuer) Update() {
vdrSet.Add(vdr.ID())
}
toSample := ids.ShortSet{} // Copy to a new variable because we may remove an element in sender.Sender
toSample.Union(vdrSet) // and we don't want that to affect the set of validators we wait for [ie vdrSet]
i.t.RequestID++
if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet.Len()) {
i.t.Config.Sender.PushQuery(vdrSet, i.t.RequestID, vtxID, i.vtx.Bytes())
if numVdrs := len(vdrs); numVdrs == p.K && i.t.polls.Add(i.t.RequestID, vdrSet) {
i.t.Config.Sender.PushQuery(toSample, i.t.RequestID, vtxID, i.vtx.Bytes())
} else if numVdrs < p.K {
i.t.Config.Context.Log.Error("Query for %s was dropped due to an insufficient number of validators", vtxID)
}

View File

@ -38,10 +38,10 @@ type polls struct {
// Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample
// should be made.
func (p *polls) Add(requestID uint32, numPolled int) bool {
func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool {
poll, exists := p.m[requestID]
if !exists {
poll.numPending = numPolled
poll.polled = vdrs
p.m[requestID] = poll
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -59,7 +59,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, votes []ids.ID) (ids.Uni
return nil, false
}
poll.Vote(votes)
poll.Vote(votes, vdr)
if poll.Finished() {
p.log.Verbo("Poll is finished")
delete(p.m, requestID)
@ -83,19 +83,19 @@ func (p *polls) String() string {
// poll represents the current state of a network poll for a vertex
type poll struct {
votes ids.UniqueBag
numPending int
votes ids.UniqueBag
polled ids.ShortSet
}
// Vote registers a vote for this poll
func (p *poll) Vote(votes []ids.ID) {
if p.numPending > 0 {
p.numPending--
p.votes.Add(uint(p.numPending), votes...)
func (p *poll) Vote(votes []ids.ID, vdr ids.ShortID) {
if p.polled.Contains(vdr) {
p.polled.Remove(vdr)
p.votes.Add(uint(p.polled.Len()), votes...)
}
}
// Finished returns true if the poll has completed, with no more required
// responses
func (p poll) Finished() bool { return p.numPending <= 0 }
func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.numPending) }
func (p poll) Finished() bool { return p.polled.Len() == 0 }
func (p poll) String() string { return fmt.Sprintf("Waiting on %d chits", p.polled.Len()) }

View File

@ -471,8 +471,11 @@ func (t *Transitive) issueRepoll() {
vdrSet.Add(vdr.ID())
}
vdrCopy := ids.ShortSet{}
vdrCopy.Union((vdrSet))
t.RequestID++
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet.Len()) {
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrCopy) {
t.Config.Sender.PullQuery(vdrSet, t.RequestID, vtxID)
} else if numVdrs < p.K {
t.Config.Context.Log.Error("re-query for %s was dropped due to an insufficient number of validators", vtxID)

View File

@ -3085,3 +3085,120 @@ func TestEngineDuplicatedIssuance(t *testing.T) {
te.Notify(common.PendingTxs)
}
func TestEngineDoubleChit(t *testing.T) {
config := DefaultConfig()
config.Params.Alpha = 2
config.Params.K = 2
vdr0 := validators.GenerateRandomValidator(1)
vdr1 := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
vals.Add(vdr0)
vals.Add(vdr1)
config.Validators = vals
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
sender.CantGetAcceptedFrontier = false
st := &stateTest{t: t}
config.State = st
st.Default(true)
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
}
mVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
}
vts := []avalanche.Vertex{gVtx, mVtx}
utxos := []ids.ID{GenerateID()}
tx := &TestTx{
TestTx: snowstorm.TestTx{
Identifier: GenerateID(),
Stat: choices.Processing,
},
}
tx.Ins.Add(utxos[0])
vtx := &Vtx{
parents: vts,
id: GenerateID(),
txs: []snowstorm.Tx{tx},
height: 1,
status: choices.Processing,
bytes: []byte{1, 1, 2, 3},
}
st.edge = func() []ids.ID { return []ids.ID{vts[0].ID(), vts[1].ID()} }
st.getVertex = func(id ids.ID) (avalanche.Vertex, error) {
switch {
case id.Equals(gVtx.ID()):
return gVtx, nil
case id.Equals(mVtx.ID()):
return mVtx, nil
}
t.Fatalf("Unknown vertex")
panic("Should have errored")
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
reqID := new(uint32)
sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, vtxID ids.ID, _ []byte) {
*reqID = requestID
if inVdrs.Len() != 2 {
t.Fatalf("Wrong number of validators")
}
if !vtxID.Equals(vtx.ID()) {
t.Fatalf("Wrong vertex requested")
}
}
st.getVertex = func(id ids.ID) (avalanche.Vertex, error) {
switch {
case id.Equals(vtx.ID()):
return vtx, nil
}
t.Fatalf("Unknown vertex")
panic("Should have errored")
}
te.insert(vtx)
votes := ids.Set{}
votes.Add(vtx.ID())
if status := tx.Status(); status != choices.Processing {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *reqID, votes)
if status := tx.Status(); status != choices.Processing {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *reqID, votes)
if status := tx.Status(); status != choices.Processing {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr1.ID(), *reqID, votes)
if status := tx.Status(); status != choices.Accepted {
t.Fatalf("Wrong tx status: %s ; expected: %s", status, choices.Accepted)
}
}

View File

@ -22,11 +22,11 @@ type polls struct {
// Add to the current set of polls
// Returns true if the poll was registered correctly and the network sample
// should be made.
func (p *polls) Add(requestID uint32, numPolled int) bool {
func (p *polls) Add(requestID uint32, vdrs ids.ShortSet) bool {
poll, exists := p.m[requestID]
if !exists {
poll.alpha = p.alpha
poll.numPolled = numPolled
poll.polled = vdrs
p.m[requestID] = poll
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -42,7 +42,7 @@ func (p *polls) Vote(requestID uint32, vdr ids.ShortID, vote ids.ID) (ids.Bag, b
if !exists {
return ids.Bag{}, false
}
poll.Vote(vote)
poll.Vote(vote, vdr)
if poll.Finished() {
delete(p.m, requestID)
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -60,7 +60,7 @@ func (p *polls) CancelVote(requestID uint32, vdr ids.ShortID) (ids.Bag, bool) {
return ids.Bag{}, false
}
poll.CancelVote()
poll.CancelVote(vdr)
if poll.Finished() {
delete(p.m, requestID)
p.numPolls.Set(float64(len(p.m))) // Tracks performance statistics
@ -83,22 +83,18 @@ func (p *polls) String() string {
// poll represents the current state of a network poll for a block
type poll struct {
alpha int
votes ids.Bag
numPolled int
alpha int
votes ids.Bag
polled ids.ShortSet
}
// Vote registers a vote for this poll
func (p *poll) CancelVote() {
if p.numPolled > 0 {
p.numPolled--
}
}
func (p *poll) CancelVote(vdr ids.ShortID) { p.polled.Remove(vdr) }
// Vote registers a vote for this poll
func (p *poll) Vote(vote ids.ID) {
if p.numPolled > 0 {
p.numPolled--
func (p *poll) Vote(vote ids.ID, vdr ids.ShortID) {
if p.polled.Contains(vdr) {
p.polled.Remove(vdr)
p.votes.Add(vote)
}
}
@ -106,13 +102,14 @@ func (p *poll) Vote(vote ids.ID) {
// Finished returns true if the poll has completed, with no more required
// responses
func (p poll) Finished() bool {
remaining := p.polled.Len()
received := p.votes.Len()
_, freq := p.votes.Mode()
return p.numPolled == 0 || // All k nodes responded
return remaining == 0 || // All k nodes responded
freq >= p.alpha || // An alpha majority has returned
received+p.numPolled < p.alpha // An alpha majority can never return
received+remaining < p.alpha // An alpha majority can never return
}
func (p poll) String() string {
return fmt.Sprintf("Waiting on %d chits", p.numPolled)
return fmt.Sprintf("Waiting on %d chits from %s", p.polled.Len(), p.polled)
}

View File

@ -542,18 +542,15 @@ func (t *Transitive) pullSample(blkID ids.ID) {
vdrSet.Add(vdr.ID())
}
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID)
return
}
toSample := ids.ShortSet{}
toSample.Union(vdrSet)
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID)
return
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) {
t.Config.Sender.PullQuery(toSample, t.RequestID, blkID)
} else if numVdrs < p.K {
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID)
}
t.Config.Sender.PullQuery(vdrSet, t.RequestID, blkID)
}
// send a push request for this block
@ -566,20 +563,15 @@ func (t *Transitive) pushSample(blk snowman.Block) {
vdrSet.Add(vdr.ID())
}
blkID := blk.ID()
if numVdrs := len(vdrs); numVdrs != p.K {
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blkID)
return
}
toSample := ids.ShortSet{}
toSample.Union(vdrSet)
t.RequestID++
if !t.polls.Add(t.RequestID, vdrSet.Len()) {
t.Config.Context.Log.Error("query for %s was dropped due to use of a duplicated requestID", blkID)
return
if numVdrs := len(vdrs); numVdrs == p.K && t.polls.Add(t.RequestID, vdrSet) {
t.Config.Sender.PushQuery(toSample, t.RequestID, blk.ID(), blk.Bytes())
} else if numVdrs < p.K {
t.Config.Context.Log.Error("query for %s was dropped due to an insufficient number of validators", blk.ID())
}
t.Config.Sender.PushQuery(vdrSet, t.RequestID, blkID, blk.Bytes())
return
}
func (t *Transitive) deliver(blk snowman.Block) error {

View File

@ -1522,3 +1522,124 @@ func TestEngineAggressivePolling(t *testing.T) {
t.Fatalf("Should have sent an additional pull query")
}
}
func TestEngineDoubleChit(t *testing.T) {
config := DefaultConfig()
config.Params = snowball.Parameters{
Metrics: prometheus.NewRegistry(),
K: 2,
Alpha: 2,
BetaVirtuous: 1,
BetaRogue: 2,
}
vdr0 := validators.GenerateRandomValidator(1)
vdr1 := validators.GenerateRandomValidator(1)
vals := validators.NewSet()
config.Validators = vals
vals.Add(vdr0)
vals.Add(vdr1)
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
vm := &VMTest{}
vm.T = t
config.VM = vm
vm.Default(true)
vm.CantSetPreference = false
gBlk := &Blk{
id: GenerateID(),
status: choices.Accepted,
}
vm.LastAcceptedF = func() ids.ID { return gBlk.ID() }
sender.CantGetAcceptedFrontier = false
vm.GetBlockF = func(id ids.ID) (snowman.Block, error) {
switch {
case id.Equals(gBlk.ID()):
return gBlk, nil
}
t.Fatalf("Unknown block")
panic("Should have errored")
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
vm.LastAcceptedF = nil
sender.CantGetAcceptedFrontier = true
blk := &Blk{
parent: gBlk,
id: GenerateID(),
status: choices.Processing,
bytes: []byte{1},
}
queried := new(bool)
queryRequestID := new(uint32)
sender.PushQueryF = func(inVdrs ids.ShortSet, requestID uint32, blkID ids.ID, blkBytes []byte) {
if *queried {
t.Fatalf("Asked multiple times")
}
*queried = true
*queryRequestID = requestID
vdrSet := ids.ShortSet{}
vdrSet.Add(vdr0.ID(), vdr1.ID())
if !inVdrs.Equals(vdrSet) {
t.Fatalf("Asking wrong validator for preference")
}
if !blk.ID().Equals(blkID) {
t.Fatalf("Asking for wrong block")
}
}
te.insert(blk)
vm.GetBlockF = func(id ids.ID) (snowman.Block, error) {
switch {
case id.Equals(gBlk.ID()):
return gBlk, nil
case id.Equals(blk.ID()):
return blk, nil
}
t.Fatalf("Unknown block")
panic("Should have errored")
}
blkSet := ids.Set{}
blkSet.Add(blk.ID())
if status := blk.Status(); status != choices.Processing {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *queryRequestID, blkSet)
if status := blk.Status(); status != choices.Processing {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr0.ID(), *queryRequestID, blkSet)
if status := blk.Status(); status != choices.Processing {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Processing)
}
te.Chits(vdr1.ID(), *queryRequestID, blkSet)
if status := blk.Status(); status != choices.Accepted {
t.Fatalf("Wrong status: %s ; expected: %s", status, choices.Accepted)
}
}

View File

@ -124,19 +124,12 @@ func (sr *ChainRouter) GetAcceptedFrontierFailed(validatorID ids.ShortID, chainI
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetAcceptedFrontierFailed(validatorID, requestID) {
sr.log.Debug("deferring GetAcceptedFrontier timeout due to a full queue on %s", chainID)
// Defer this call to later
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.GetAcceptedFrontierFailed(validatorID, chainID, requestID)
})
return
}
chain.GetAcceptedFrontierFailed(validatorID, requestID)
} else {
sr.log.Error("GetAcceptedFrontierFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// GetAccepted routes an incoming GetAccepted request from the
@ -176,18 +169,12 @@ func (sr *ChainRouter) GetAcceptedFailed(validatorID ids.ShortID, chainID ids.ID
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetAcceptedFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring GetAccepted timeout due to a full queue on %s", chainID)
sr.GetAcceptedFailed(validatorID, chainID, requestID)
})
return
}
chain.GetAcceptedFailed(validatorID, requestID)
} else {
sr.log.Error("GetAcceptedFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// GetAncestors routes an incoming GetAncestors message from the validator with ID [validatorID]
@ -227,18 +214,12 @@ func (sr *ChainRouter) GetAncestorsFailed(validatorID ids.ShortID, chainID ids.I
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetAncestorsFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring GetAncestors timeout due to a full queue on %s", chainID)
sr.GetAncestorsFailed(validatorID, chainID, requestID)
})
return
}
chain.GetAncestorsFailed(validatorID, requestID)
} else {
sr.log.Error("GetAncestorsFailed(%s, %s, %d, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// Get routes an incoming Get request from the validator with ID [validatorID]
@ -278,18 +259,12 @@ func (sr *ChainRouter) GetFailed(validatorID ids.ShortID, chainID ids.ID, reques
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.GetFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring Get timeout due to a full queue on %s", chainID)
sr.GetFailed(validatorID, chainID, requestID)
})
return
}
chain.GetFailed(validatorID, requestID)
} else {
sr.log.Error("GetFailed(%s, %s, %d) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// PushQuery routes an incoming PushQuery request from the validator with ID [validatorID]
@ -341,18 +316,12 @@ func (sr *ChainRouter) QueryFailed(validatorID ids.ShortID, chainID ids.ID, requ
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.timeouts.Cancel(validatorID, chainID, requestID)
if chain, exists := sr.chains[chainID.Key()]; exists {
if !chain.QueryFailed(validatorID, requestID) {
sr.timeouts.Register(validatorID, chainID, requestID, func() {
sr.log.Debug("deferring Query timeout due to a full queue on %s", chainID)
sr.QueryFailed(validatorID, chainID, requestID)
})
return
}
chain.QueryFailed(validatorID, requestID)
} else {
sr.log.Error("QueryFailed(%s, %s, %d, %s) dropped due to unknown chain", validatorID, chainID, requestID)
}
sr.timeouts.Cancel(validatorID, chainID, requestID)
}
// Shutdown shuts down this router

View File

@ -4,12 +4,14 @@
package router
import (
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/prometheus/client_golang/prometheus"
)
// Handler passes incoming messages from the network to the consensus engine
@ -17,12 +19,18 @@ import (
type Handler struct {
metrics
msgs chan message
closed chan struct{}
engine common.Engine
msgChan <-chan common.Message
msgs chan message
reliableMsgsSema chan struct{}
reliableMsgsLock sync.Mutex
reliableMsgs []message
closed chan struct{}
msgChan <-chan common.Message
ctx *snow.Context
engine common.Engine
toClose func()
closing bool
}
// Initialize this consensus handler
@ -35,9 +43,12 @@ func (h *Handler) Initialize(
) {
h.metrics.Initialize(namespace, metrics)
h.msgs = make(chan message, bufferSize)
h.reliableMsgsSema = make(chan struct{}, 1)
h.closed = make(chan struct{})
h.engine = engine
h.msgChan = msgChan
h.ctx = engine.Context()
h.engine = engine
}
// Context of this Handler
@ -46,37 +57,38 @@ func (h *Handler) Context() *snow.Context { return h.engine.Context() }
// Dispatch waits for incoming messages from the network
// and, when they arrive, sends them to the consensus engine
func (h *Handler) Dispatch() {
log := h.Context().Log
defer func() {
log.Info("finished shutting down chain")
h.ctx.Log.Info("finished shutting down chain")
close(h.closed)
}()
closing := false
for {
select {
case msg, ok := <-h.msgs:
if !ok {
// the msgs channel has been closed, so this dispatcher should exit
return
}
h.metrics.pending.Dec()
if closing {
log.Debug("dropping message due to closing:\n%s", msg)
continue
}
if h.dispatchMsg(msg) {
closing = true
h.dispatchMsg(msg)
case <-h.reliableMsgsSema:
// get all the reliable messages
h.reliableMsgsLock.Lock()
msgs := h.reliableMsgs
h.reliableMsgs = nil
h.reliableMsgsLock.Unlock()
// fire all the reliable messages
for _, msg := range msgs {
h.metrics.pending.Dec()
h.dispatchMsg(msg)
}
case msg := <-h.msgChan:
if closing {
log.Debug("dropping internal message due to closing:\n%s", msg)
continue
}
if h.dispatchMsg(message{messageType: notifyMsg, notification: msg}) {
closing = true
}
// handle a message from the VM
h.dispatchMsg(message{messageType: notifyMsg, notification: msg})
}
if closing && h.toClose != nil {
if h.closing && h.toClose != nil {
go h.toClose()
}
}
@ -85,14 +97,19 @@ func (h *Handler) Dispatch() {
// Dispatch a message to the consensus engine.
// Returns true iff this consensus handler (and its associated engine) should shutdown
// (due to receipt of a shutdown message)
func (h *Handler) dispatchMsg(msg message) bool {
func (h *Handler) dispatchMsg(msg message) {
if h.closing {
h.ctx.Log.Debug("dropping message due to closing:\n%s", msg)
h.metrics.dropped.Inc()
return
}
startTime := time.Now()
ctx := h.engine.Context()
ctx.Lock.Lock()
defer ctx.Lock.Unlock()
h.ctx.Lock.Lock()
defer h.ctx.Lock.Unlock()
ctx.Log.Verbo("Forwarding message to consensus: %s", msg)
h.ctx.Log.Verbo("Forwarding message to consensus: %s", msg)
var (
err error
done bool
@ -159,9 +176,10 @@ func (h *Handler) dispatchMsg(msg message) bool {
}
if err != nil {
ctx.Log.Fatal("forcing chain to shutdown due to %s", err)
h.ctx.Log.Fatal("forcing chain to shutdown due to %s", err)
}
return done || err != nil
h.closing = done || err != nil
}
// GetAcceptedFrontier passes a GetAcceptedFrontier message received from the
@ -187,8 +205,8 @@ func (h *Handler) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, co
// GetAcceptedFrontierFailed passes a GetAcceptedFrontierFailed message received
// from the network to the consensus engine.
func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) GetAcceptedFrontierFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getAcceptedFrontierFailedMsg,
validatorID: validatorID,
requestID: requestID,
@ -219,14 +237,43 @@ func (h *Handler) Accepted(validatorID ids.ShortID, requestID uint32, containerI
// GetAcceptedFailed passes a GetAcceptedFailed message received from the
// network to the consensus engine.
func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) GetAcceptedFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getAcceptedFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool {
return h.sendMsg(message{
messageType: getAncestorsMsg,
validatorID: validatorID,
requestID: requestID,
containerID: containerID,
})
}
// MultiPut passes a MultiPut message received from the network to the consensus engine.
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) bool {
return h.sendMsg(message{
messageType: multiPutMsg,
validatorID: validatorID,
requestID: requestID,
containers: containers,
})
}
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getAncestorsFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// Get passes a Get message received from the network to the consensus engine.
func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool {
return h.sendMsg(message{
@ -237,16 +284,6 @@ func (h *Handler) Get(validatorID ids.ShortID, requestID uint32, containerID ids
})
}
// GetAncestors passes a GetAncestors message received from the network to the consensus engine.
func (h *Handler) GetAncestors(validatorID ids.ShortID, requestID uint32, containerID ids.ID) bool {
return h.sendMsg(message{
messageType: getAncestorsMsg,
validatorID: validatorID,
requestID: requestID,
containerID: containerID,
})
}
// Put passes a Put message received from the network to the consensus engine.
func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids.ID, container []byte) bool {
return h.sendMsg(message{
@ -258,34 +295,15 @@ func (h *Handler) Put(validatorID ids.ShortID, requestID uint32, containerID ids
})
}
// MultiPut passes a MultiPut message received from the network to the consensus engine.
func (h *Handler) MultiPut(validatorID ids.ShortID, requestID uint32, containers [][]byte) bool {
return h.sendMsg(message{
messageType: multiPutMsg,
validatorID: validatorID,
requestID: requestID,
containers: containers,
})
}
// GetFailed passes a GetFailed message to the consensus engine.
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) GetFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: getFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// GetAncestorsFailed passes a GetAncestorsFailed message to the consensus engine.
func (h *Handler) GetAncestorsFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
messageType: getAncestorsFailedMsg,
validatorID: validatorID,
requestID: requestID,
})
}
// PushQuery passes a PushQuery message received from the network to the consensus engine.
func (h *Handler) PushQuery(validatorID ids.ShortID, requestID uint32, blockID ids.ID, block []byte) bool {
return h.sendMsg(message{
@ -318,8 +336,8 @@ func (h *Handler) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set
}
// QueryFailed passes a QueryFailed message received from the network to the consensus engine.
func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) bool {
return h.sendMsg(message{
func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) {
h.sendReliableMsg(message{
messageType: queryFailedMsg,
validatorID: validatorID,
requestID: requestID,
@ -341,8 +359,9 @@ func (h *Handler) Notify(msg common.Message) bool {
// Shutdown shuts down the dispatcher
func (h *Handler) Shutdown() {
h.metrics.pending.Inc()
h.msgs <- message{messageType: shutdownMsg}
h.sendReliableMsg(message{
messageType: shutdownMsg,
})
}
func (h *Handler) sendMsg(msg message) bool {
@ -355,3 +374,15 @@ func (h *Handler) sendMsg(msg message) bool {
return false
}
}
func (h *Handler) sendReliableMsg(msg message) {
h.reliableMsgsLock.Lock()
defer h.reliableMsgsLock.Unlock()
h.metrics.pending.Inc()
h.reliableMsgs = append(h.reliableMsgs, msg)
select {
case h.reliableMsgsSema <- struct{}{}:
default:
}
}

View File

@ -31,17 +31,16 @@ func (s *Sender) Context() *snow.Context { return s.ctx }
// GetAcceptedFrontier ...
func (s *Sender) GetAcceptedFrontier(validatorIDs ids.ShortSet, requestID uint32) {
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAcceptedFrontier(s.ctx.NodeID, s.ctx.ChainID, requestID)
}
validatorList := validatorIDs.List()
for _, validatorID := range validatorList {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetAcceptedFrontierFailed(vID, s.ctx.ChainID, requestID)
})
}
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAcceptedFrontier(s.ctx.NodeID, s.ctx.ChainID, requestID)
}
s.sender.GetAcceptedFrontier(validatorIDs, s.ctx.ChainID, requestID)
}
@ -49,24 +48,23 @@ func (s *Sender) GetAcceptedFrontier(validatorIDs ids.ShortSet, requestID uint32
func (s *Sender) AcceptedFrontier(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) {
if validatorID.Equals(s.ctx.NodeID) {
go s.router.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs)
return
} else {
s.sender.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
s.sender.AcceptedFrontier(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
// GetAccepted ...
func (s *Sender) GetAccepted(validatorIDs ids.ShortSet, requestID uint32, containerIDs ids.Set) {
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAccepted(s.ctx.NodeID, s.ctx.ChainID, requestID, containerIDs)
}
validatorList := validatorIDs.List()
for _, validatorID := range validatorList {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetAcceptedFailed(vID, s.ctx.ChainID, requestID)
})
}
if validatorIDs.Contains(s.ctx.NodeID) {
validatorIDs.Remove(s.ctx.NodeID)
go s.router.GetAccepted(s.ctx.NodeID, s.ctx.ChainID, requestID, containerIDs)
}
s.sender.GetAccepted(validatorIDs, s.ctx.ChainID, requestID, containerIDs)
}
@ -74,9 +72,9 @@ func (s *Sender) GetAccepted(validatorIDs ids.ShortSet, requestID uint32, contai
func (s *Sender) Accepted(validatorID ids.ShortID, requestID uint32, containerIDs ids.Set) {
if validatorID.Equals(s.ctx.NodeID) {
go s.router.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs)
return
} else {
s.sender.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
s.sender.Accepted(validatorID, s.ctx.ChainID, requestID, containerIDs)
}
// Get sends a Get message to the consensus engine running on the specified
@ -85,6 +83,13 @@ func (s *Sender) Accepted(validatorID ids.ShortID, requestID uint32, containerID
// specified container.
func (s *Sender) Get(validatorID ids.ShortID, requestID uint32, containerID ids.ID) {
s.ctx.Log.Verbo("Sending Get to validator %s. RequestID: %d. ContainerID: %s", validatorID, requestID, containerID)
// Sending a Get to myself will always fail
if validatorID.Equals(s.ctx.NodeID) {
go s.router.GetFailed(validatorID, s.ctx.ChainID, requestID)
return
}
// Add a timeout -- if we don't get a response before the timeout expires,
// send this consensus engine a GetFailed message
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
@ -101,6 +106,7 @@ func (s *Sender) GetAncestors(validatorID ids.ShortID, requestID uint32, contain
go s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID)
return
}
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.GetAncestorsFailed(validatorID, s.ctx.ChainID, requestID)
})
@ -130,6 +136,13 @@ func (s *Sender) MultiPut(validatorID ids.ShortID, requestID uint32, containers
// their preferred frontier given the existence of the specified container.
func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containerID ids.ID, container []byte) {
s.ctx.Log.Verbo("Sending PushQuery to validators %v. RequestID: %d. ContainerID: %s", validatorIDs, requestID, containerID)
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
// If one of the validators in [validatorIDs] is myself, send this message directly
// to my own router rather than sending it over the network
if validatorIDs.Contains(s.ctx.NodeID) { // One of the validators in [validatorIDs] was myself
@ -139,13 +152,7 @@ func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containe
// If this were not a goroutine, then we would deadlock here when [handler].msgs is full
go s.router.PushQuery(s.ctx.NodeID, s.ctx.ChainID, requestID, containerID, container)
}
validatorList := validatorIDs.List() // Convert set to list for easier iteration
for _, validatorID := range validatorList {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
s.sender.PushQuery(validatorIDs, s.ctx.ChainID, requestID, containerID, container)
}
@ -155,6 +162,14 @@ func (s *Sender) PushQuery(validatorIDs ids.ShortSet, requestID uint32, containe
// their preferred frontier.
func (s *Sender) PullQuery(validatorIDs ids.ShortSet, requestID uint32, containerID ids.ID) {
s.ctx.Log.Verbo("Sending PullQuery. RequestID: %d. ContainerID: %s", requestID, containerID)
for _, validatorID := range validatorIDs.List() {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
// If one of the validators in [validatorIDs] is myself, send this message directly
// to my own router rather than sending it over the network
if validatorIDs.Contains(s.ctx.NodeID) { // One of the validators in [validatorIDs] was myself
@ -164,13 +179,7 @@ func (s *Sender) PullQuery(validatorIDs ids.ShortSet, requestID uint32, containe
// If this were not a goroutine, then we would deadlock when [handler].msgs is full
go s.router.PullQuery(s.ctx.NodeID, s.ctx.ChainID, requestID, containerID)
}
validatorList := validatorIDs.List() // Convert set to list for easier iteration
for _, validatorID := range validatorList {
vID := validatorID
s.timeouts.Register(validatorID, s.ctx.ChainID, requestID, func() {
s.router.QueryFailed(vID, s.ctx.ChainID, requestID)
})
}
s.sender.PullQuery(validatorIDs, s.ctx.ChainID, requestID, containerID)
}
@ -181,9 +190,9 @@ func (s *Sender) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set)
// to my own router rather than sending it over the network
if validatorID.Equals(s.ctx.NodeID) {
go s.router.Chits(validatorID, s.ctx.ChainID, requestID, votes)
return
} else {
s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes)
}
s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes)
}
// Gossip the provided container

View File

@ -4,18 +4,20 @@
package sender
import (
"math/rand"
"reflect"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/networking/timeout"
"github.com/ava-labs/gecko/utils/logging"
"github.com/prometheus/client_golang/prometheus"
)
func TestSenderContext(t *testing.T) {
@ -82,3 +84,128 @@ func TestTimeout(t *testing.T) {
t.Fatalf("Timeouts should have fired")
}
}
func TestReliableMessages(t *testing.T) {
tm := timeout.Manager{}
tm.Initialize(50 * time.Millisecond)
go tm.Dispatch()
chainRouter := router.ChainRouter{}
chainRouter.Initialize(logging.NoLog{}, &tm, time.Hour, time.Second)
sender := Sender{}
sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &chainRouter, &tm)
engine := common.EngineTest{T: t}
engine.Default(true)
engine.ContextF = snow.DefaultContextTest
engine.GossipF = func() error { return nil }
queriesToSend := 1000
awaiting := make([]chan struct{}, queriesToSend)
for i := 0; i < queriesToSend; i++ {
awaiting[i] = make(chan struct{}, 1)
}
engine.QueryFailedF = func(validatorID ids.ShortID, reqID uint32) error {
close(awaiting[int(reqID)])
return nil
}
handler := router.Handler{}
handler.Initialize(
&engine,
nil,
1,
"",
prometheus.NewRegistry(),
)
go handler.Dispatch()
chainRouter.AddChain(&handler)
go func() {
for i := 0; i < queriesToSend; i++ {
vdrIDs := ids.ShortSet{}
vdrIDs.Add(ids.NewShortID([20]byte{1}))
sender.PullQuery(vdrIDs, uint32(i), ids.Empty)
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
go func() {
for {
chainRouter.Gossip()
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
for _, await := range awaiting {
_, _ = <-await
}
}
func TestReliableMessagesToMyself(t *testing.T) {
tm := timeout.Manager{}
tm.Initialize(50 * time.Millisecond)
go tm.Dispatch()
chainRouter := router.ChainRouter{}
chainRouter.Initialize(logging.NoLog{}, &tm, time.Hour, time.Second)
sender := Sender{}
sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &chainRouter, &tm)
engine := common.EngineTest{T: t}
engine.Default(false)
engine.ContextF = snow.DefaultContextTest
engine.GossipF = func() error { return nil }
engine.CantPullQuery = false
queriesToSend := 2
awaiting := make([]chan struct{}, queriesToSend)
for i := 0; i < queriesToSend; i++ {
awaiting[i] = make(chan struct{}, 1)
}
engine.QueryFailedF = func(validatorID ids.ShortID, reqID uint32) error {
close(awaiting[int(reqID)])
return nil
}
handler := router.Handler{}
handler.Initialize(
&engine,
nil,
1,
"",
prometheus.NewRegistry(),
)
go handler.Dispatch()
chainRouter.AddChain(&handler)
go func() {
for i := 0; i < queriesToSend; i++ {
vdrIDs := ids.ShortSet{}
vdrIDs.Add(engine.Context().NodeID)
sender.PullQuery(vdrIDs, uint32(i), ids.Empty)
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
go func() {
for {
chainRouter.Gossip()
time.Sleep(time.Duration(rand.Float64() * float64(time.Microsecond)))
}
}()
for _, await := range awaiting {
_, _ = <-await
}
}