Merge pull request #129 from StephenButtolph/gossip

Gossip
This commit is contained in:
Stephen Buttolph 2020-05-07 12:59:46 -04:00 committed by GitHub
commit 394441cbb4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
22 changed files with 281 additions and 43 deletions

View File

@ -40,6 +40,7 @@ import (
const ( const (
defaultChannelSize = 1000 defaultChannelSize = 1000
requestTimeout = 2 * time.Second requestTimeout = 2 * time.Second
gossipFrequency = 10 * time.Second
) )
// Manager manages the chains running on this node. // Manager manages the chains running on this node.
@ -146,7 +147,7 @@ func New(
timeoutManager.Initialize(requestTimeout) timeoutManager.Initialize(requestTimeout)
go log.RecoverAndPanic(timeoutManager.Dispatch) go log.RecoverAndPanic(timeoutManager.Dispatch)
router.Initialize(log, &timeoutManager) router.Initialize(log, &timeoutManager, gossipFrequency)
m := &manager{ m := &manager{
stakingEnabled: stakingEnabled, stakingEnabled: stakingEnabled,

View File

@ -18,6 +18,7 @@ import "C"
import ( import (
"errors" "errors"
"fmt" "fmt"
"math"
"unsafe" "unsafe"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -29,9 +30,15 @@ import (
"github.com/ava-labs/gecko/snow/validators" "github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils/formatting" "github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/logging" "github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/random"
"github.com/ava-labs/gecko/utils/timer" "github.com/ava-labs/gecko/utils/timer"
) )
// GossipSize is the maximum number of peers to gossip a container to
const (
GossipSize = 50
)
var ( var (
// VotingNet implements the SenderExternal interface. // VotingNet implements the SenderExternal interface.
VotingNet = Voting{} VotingNet = Voting{}
@ -89,34 +96,7 @@ func (s *Voting) Shutdown() { s.executor.Stop() }
// Accept is called after every consensus decision // Accept is called after every consensus decision
func (s *Voting) Accept(chainID, containerID ids.ID, container []byte) error { func (s *Voting) Accept(chainID, containerID ids.ID, container []byte) error {
peers := []salticidae.PeerID(nil) return s.gossip(chainID, containerID, container)
allPeers, allIDs, _ := s.conns.Conns()
for i, id := range allIDs {
if !s.vdrs.Contains(id) {
peers = append(peers, allPeers[i])
}
}
build := Builder{}
msg, err := build.Put(chainID, 0, containerID, container)
if err != nil {
return fmt.Errorf("Attempted to pack too large of a Put message.\nContainer length: %d: %w", len(container), err)
}
s.log.Verbo("Sending a Put message to non-validators."+
"\nNumber of Non-Validators: %d"+
"\nChain: %s"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(peers),
chainID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, peers...)
s.numPutSent.Add(float64(len(peers)))
return nil
} }
// GetAcceptedFrontier implements the Sender interface. // GetAcceptedFrontier implements the Sender interface.
@ -412,6 +392,13 @@ func (s *Voting) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32
s.numChitsSent.Inc() s.numChitsSent.Inc()
} }
// Gossip attempts to gossip the container to the network
func (s *Voting) Gossip(chainID, containerID ids.ID, container []byte) {
if err := s.gossip(chainID, containerID, container); err != nil {
s.log.Error("Error gossiping container %s to %s\n%s", containerID, chainID, err)
}
}
func (s *Voting) send(msg Msg, peers ...salticidae.PeerID) { func (s *Voting) send(msg Msg, peers ...salticidae.PeerID) {
ds := msg.DataStream() ds := msg.DataStream()
defer ds.Free() defer ds.Free()
@ -429,6 +416,41 @@ func (s *Voting) send(msg Msg, peers ...salticidae.PeerID) {
} }
} }
func (s *Voting) gossip(chainID, containerID ids.ID, container []byte) error {
allPeers := s.conns.PeerIDs()
numToGossip := GossipSize
if numToGossip > len(allPeers) {
numToGossip = len(allPeers)
}
peers := make([]salticidae.PeerID, numToGossip)
sampler := random.Uniform{N: len(allPeers)}
for i := range peers {
peers[i] = allPeers[sampler.Sample()]
}
build := Builder{}
msg, err := build.Put(chainID, math.MaxUint32, containerID, container)
if err != nil {
return fmt.Errorf("Attempted to pack too large of a Put message.\nContainer length: %d: %w", len(container), err)
}
s.log.Verbo("Sending a Put message to peers."+
"\nNumber of Peers: %d"+
"\nChain: %s"+
"\nContainer ID: %s"+
"\nContainer:\n%s",
len(peers),
chainID,
containerID,
formatting.DumpBytes{Bytes: container},
)
s.send(msg, peers...)
s.numPutSent.Add(float64(len(peers)))
return nil
}
// getAcceptedFrontier handles the recept of a getAcceptedFrontier container // getAcceptedFrontier handles the recept of a getAcceptedFrontier container
// message for a chain // message for a chain
//export getAcceptedFrontier //export getAcceptedFrontier

View File

@ -8,6 +8,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -60,7 +61,7 @@ func newConfig(t *testing.T) (BootstrapConfig, ids.ShortID, *common.SenderTest,
handler.Initialize(engine, make(chan common.Message), 1) handler.Initialize(engine, make(chan common.Message), 1)
timeouts.Initialize(0) timeouts.Initialize(0)
router.Initialize(ctx.Log, timeouts) router.Initialize(ctx.Log, timeouts, time.Hour)
vtxBlocker, _ := queue.New(prefixdb.New([]byte("vtx"), db)) vtxBlocker, _ := queue.New(prefixdb.New([]byte("vtx"), db))
txBlocker, _ := queue.New(prefixdb.New([]byte("tx"), db)) txBlocker, _ := queue.New(prefixdb.New([]byte("tx"), db))

View File

@ -62,6 +62,26 @@ func (t *Transitive) finishBootstrapping() {
t.bootstrapped = true t.bootstrapped = true
} }
// Gossip implements the Engine interface
func (t *Transitive) Gossip() {
edge := t.Config.State.Edge()
if len(edge) == 0 {
t.Config.Context.Log.Debug("Dropping gossip request as no vertices have been accepted")
return
}
sampler := random.Uniform{N: len(edge)}
vtxID := edge[sampler.Sample()]
vtx, err := t.Config.State.GetVertex(vtxID)
if err != nil {
t.Config.Context.Log.Warn("Dropping gossip request as %s couldn't be loaded due to %s", vtxID, err)
return
}
t.Config.Context.Log.Debug("Gossiping %s as accepted to the network", vtxID)
t.Config.Sender.Gossip(vtxID, vtx.Bytes())
}
// Shutdown implements the Engine interface // Shutdown implements the Engine interface
func (t *Transitive) Shutdown() { func (t *Transitive) Shutdown() {
t.Config.Context.Log.Info("Shutting down Avalanche consensus") t.Config.Context.Log.Info("Shutting down Avalanche consensus")

View File

@ -2536,3 +2536,54 @@ func TestEnginePartiallyValidVertex(t *testing.T) {
te.insert(vtx) te.insert(vtx)
} }
func TestEngineGossip(t *testing.T) {
config := DefaultConfig()
sender := &common.SenderTest{}
sender.T = t
config.Sender = sender
sender.Default(true)
st := &stateTest{t: t}
config.State = st
gVtx := &Vtx{
id: GenerateID(),
status: choices.Accepted,
}
te := &Transitive{}
te.Initialize(config)
te.finishBootstrapping()
st.edge = func() []ids.ID { return []ids.ID{gVtx.ID()} }
st.getVertex = func(vtxID ids.ID) (avalanche.Vertex, error) {
switch {
case vtxID.Equals(gVtx.ID()):
return gVtx, nil
}
t.Fatal(errUnknownVertex)
return nil, errUnknownVertex
}
called := new(bool)
sender.GossipF = func(vtxID ids.ID, vtxBytes []byte) {
*called = true
switch {
case !vtxID.Equals(gVtx.ID()):
t.Fatal(errUnknownVertex)
}
switch {
case !bytes.Equal(vtxBytes, gVtx.Bytes()):
t.Fatal(errUnknownVertex)
}
}
te.Gossip()
if !*called {
t.Fatalf("Should have gossiped the vertex")
}
}

View File

@ -112,6 +112,9 @@ type InternalHandler interface {
// Startup this engine. // Startup this engine.
Startup() Startup()
// Gossip to the network a container on the accepted frontier
Gossip()
// Shutdown this engine. // Shutdown this engine.
Shutdown() Shutdown()

View File

@ -14,6 +14,7 @@ type Sender interface {
AcceptedSender AcceptedSender
FetchSender FetchSender
QuerySender QuerySender
Gossiper
} }
// FrontierSender defines how a consensus engine sends frontier messages to // FrontierSender defines how a consensus engine sends frontier messages to
@ -70,3 +71,10 @@ type QuerySender interface {
// Chits sends chits to the specified validator // Chits sends chits to the specified validator
Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set)
} }
// Gossiper defines how a consensus engine gossips a container on the accepted
// frontier to other validators
type Gossiper interface {
// Gossip gossips the provided container throughout the network
Gossip(containerID ids.ID, container []byte)
}

View File

@ -15,6 +15,7 @@ type EngineTest struct {
T *testing.T T *testing.T
CantStartup, CantStartup,
CantGossip,
CantShutdown, CantShutdown,
CantContext, CantContext,
@ -38,7 +39,7 @@ type EngineTest struct {
CantQueryFailed, CantQueryFailed,
CantChits bool CantChits bool
StartupF, ShutdownF func() StartupF, GossipF, ShutdownF func()
ContextF func() *snow.Context ContextF func() *snow.Context
NotifyF func(Message) NotifyF func(Message)
GetF, GetFailedF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID) GetF, GetFailedF, PullQueryF func(validatorID ids.ShortID, requestID uint32, containerID ids.ID)
@ -50,6 +51,7 @@ type EngineTest struct {
// Default ... // Default ...
func (e *EngineTest) Default(cant bool) { func (e *EngineTest) Default(cant bool) {
e.CantStartup = cant e.CantStartup = cant
e.CantGossip = cant
e.CantShutdown = cant e.CantShutdown = cant
e.CantContext = cant e.CantContext = cant
@ -83,6 +85,15 @@ func (e *EngineTest) Startup() {
} }
} }
// Gossip ...
func (e *EngineTest) Gossip() {
if e.GossipF != nil {
e.GossipF()
} else if e.CantGossip && e.T != nil {
e.T.Fatalf("Unexpectedly called Gossip")
}
}
// Shutdown ... // Shutdown ...
func (e *EngineTest) Shutdown() { func (e *EngineTest) Shutdown() {
if e.ShutdownF != nil { if e.ShutdownF != nil {

View File

@ -16,7 +16,8 @@ type SenderTest struct {
CantGetAcceptedFrontier, CantAcceptedFrontier, CantGetAcceptedFrontier, CantAcceptedFrontier,
CantGetAccepted, CantAccepted, CantGetAccepted, CantAccepted,
CantGet, CantPut, CantGet, CantPut,
CantPullQuery, CantPushQuery, CantChits bool CantPullQuery, CantPushQuery, CantChits,
CantGossip bool
GetAcceptedFrontierF func(ids.ShortSet, uint32) GetAcceptedFrontierF func(ids.ShortSet, uint32)
AcceptedFrontierF func(ids.ShortID, uint32, ids.Set) AcceptedFrontierF func(ids.ShortID, uint32, ids.Set)
@ -27,6 +28,7 @@ type SenderTest struct {
PushQueryF func(ids.ShortSet, uint32, ids.ID, []byte) PushQueryF func(ids.ShortSet, uint32, ids.ID, []byte)
PullQueryF func(ids.ShortSet, uint32, ids.ID) PullQueryF func(ids.ShortSet, uint32, ids.ID)
ChitsF func(ids.ShortID, uint32, ids.Set) ChitsF func(ids.ShortID, uint32, ids.Set)
GossipF func(ids.ID, []byte)
} }
// Default set the default callable value to [cant] // Default set the default callable value to [cant]
@ -40,6 +42,7 @@ func (s *SenderTest) Default(cant bool) {
s.CantPullQuery = cant s.CantPullQuery = cant
s.CantPushQuery = cant s.CantPushQuery = cant
s.CantChits = cant s.CantChits = cant
s.CantGossip = cant
} }
// GetAcceptedFrontier calls GetAcceptedFrontierF if it was initialized. If it // GetAcceptedFrontier calls GetAcceptedFrontierF if it was initialized. If it
@ -140,3 +143,14 @@ func (s *SenderTest) Chits(vdr ids.ShortID, requestID uint32, votes ids.Set) {
s.T.Fatalf("Unexpectedly called Chits") s.T.Fatalf("Unexpectedly called Chits")
} }
} }
// Gossip calls GossipF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
func (s *SenderTest) Gossip(containerID ids.ID, container []byte) {
if s.GossipF != nil {
s.GossipF(containerID, container)
} else if s.CantGossip && s.T != nil {
s.T.Fatalf("Unexpectedly called Gossip")
}
}

View File

@ -8,6 +8,7 @@ import (
"errors" "errors"
"fmt" "fmt"
"testing" "testing"
"time"
"github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus"
@ -54,7 +55,7 @@ func newConfig(t *testing.T) (BootstrapConfig, ids.ShortID, *common.SenderTest,
handler.Initialize(engine, make(chan common.Message), 1) handler.Initialize(engine, make(chan common.Message), 1)
timeouts.Initialize(0) timeouts.Initialize(0)
router.Initialize(ctx.Log, timeouts) router.Initialize(ctx.Log, timeouts, time.Hour)
blocker, _ := queue.New(db) blocker, _ := queue.New(db)

View File

@ -65,6 +65,19 @@ func (t *Transitive) finishBootstrapping() {
} }
} }
// Gossip implements the Engine interface
func (t *Transitive) Gossip() {
blkID := t.Config.VM.LastAccepted()
blk, err := t.Config.VM.GetBlock(blkID)
if err != nil {
t.Config.Context.Log.Warn("Dropping gossip request as %s couldn't be loaded due to %s", blkID, err)
return
}
t.Config.Context.Log.Debug("Gossiping %s as accepted to the network", blkID)
t.Config.Sender.Gossip(blkID, blk.Bytes())
}
// Shutdown implements the Engine interface // Shutdown implements the Engine interface
func (t *Transitive) Shutdown() { func (t *Transitive) Shutdown() {
t.Config.Context.Log.Info("Shutting down Snowman consensus") t.Config.Context.Log.Info("Shutting down Snowman consensus")

View File

@ -1187,3 +1187,36 @@ func TestEngineUndeclaredDependencyDeadlock(t *testing.T) {
t.Fatalf("Should have bubbled invalid votes to the valid parent") t.Fatalf("Should have bubbled invalid votes to the valid parent")
} }
} }
func TestEngineGossip(t *testing.T) {
_, _, sender, vm, te, gBlk := setup(t)
vm.LastAcceptedF = func() ids.ID { return gBlk.ID() }
vm.GetBlockF = func(blkID ids.ID) (snowman.Block, error) {
switch {
case blkID.Equals(gBlk.ID()):
return gBlk, nil
}
t.Fatal(errUnknownBlock)
return nil, errUnknownBlock
}
called := new(bool)
sender.GossipF = func(blkID ids.ID, blkBytes []byte) {
*called = true
switch {
case !blkID.Equals(gBlk.ID()):
t.Fatal(errUnknownBlock)
}
switch {
case !bytes.Equal(blkBytes, gBlk.Bytes()):
t.Fatal(errUnknownBytes)
}
}
te.Gossip()
if !*called {
t.Fatalf("Should have gossiped the block")
}
}

View File

@ -91,6 +91,8 @@ func (h *Handler) dispatchMsg(msg message) bool {
h.engine.Chits(msg.validatorID, msg.requestID, msg.containerIDs) h.engine.Chits(msg.validatorID, msg.requestID, msg.containerIDs)
case notifyMsg: case notifyMsg:
h.engine.Notify(msg.notification) h.engine.Notify(msg.notification)
case gossipMsg:
h.engine.Gossip()
case shutdownMsg: case shutdownMsg:
h.engine.Shutdown() h.engine.Shutdown()
return false return false
@ -232,6 +234,9 @@ func (h *Handler) QueryFailed(validatorID ids.ShortID, requestID uint32) {
} }
} }
// Gossip passes a gossip request to the consensus engine
func (h *Handler) Gossip() { h.msgs <- message{messageType: gossipMsg} }
// Shutdown shuts down the dispatcher // Shutdown shuts down the dispatcher
func (h *Handler) Shutdown() { h.msgs <- message{messageType: shutdownMsg}; h.wg.Wait() } func (h *Handler) Shutdown() { h.msgs <- message{messageType: shutdownMsg}; h.wg.Wait() }

View File

@ -29,6 +29,7 @@ const (
chitsMsg chitsMsg
queryFailedMsg queryFailedMsg
notifyMsg notifyMsg
gossipMsg
shutdownMsg shutdownMsg
) )
@ -87,6 +88,8 @@ func (t msgType) String() string {
return "Query Failed Message" return "Query Failed Message"
case notifyMsg: case notifyMsg:
return "Notify Message" return "Notify Message"
case gossipMsg:
return "Gossip Message"
case shutdownMsg: case shutdownMsg:
return "Shutdown Message" return "Shutdown Message"
default: default:

View File

@ -4,6 +4,8 @@
package router package router
import ( import (
"time"
"github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/handler" "github.com/ava-labs/gecko/snow/networking/handler"
"github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/networking/timeout"
@ -19,7 +21,7 @@ type Router interface {
AddChain(chain *handler.Handler) AddChain(chain *handler.Handler)
RemoveChain(chainID ids.ID) RemoveChain(chainID ids.ID)
Shutdown() Shutdown()
Initialize(log logging.Logger, timeouts *timeout.Manager) Initialize(log logging.Logger, timeouts *timeout.Manager, gossipFrequency time.Duration)
} }
// ExternalRouter routes messages from the network to the // ExternalRouter routes messages from the network to the

View File

@ -5,11 +5,13 @@ package router
import ( import (
"sync" "sync"
"time"
"github.com/ava-labs/gecko/ids" "github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/handler" "github.com/ava-labs/gecko/snow/networking/handler"
"github.com/ava-labs/gecko/snow/networking/timeout" "github.com/ava-labs/gecko/snow/networking/timeout"
"github.com/ava-labs/gecko/utils/logging" "github.com/ava-labs/gecko/utils/logging"
"github.com/ava-labs/gecko/utils/timer"
) )
// ChainRouter routes incoming messages from the validator network // ChainRouter routes incoming messages from the validator network
@ -21,15 +23,24 @@ type ChainRouter struct {
lock sync.RWMutex lock sync.RWMutex
chains map[[32]byte]*handler.Handler chains map[[32]byte]*handler.Handler
timeouts *timeout.Manager timeouts *timeout.Manager
gossiper *timer.Repeater
} }
// Initialize the router // Initialize the router.
// When this router receives an incoming message, it cancels the timeout in [timeouts] //
// associated with the request that caused the incoming message, if applicable // When this router receives an incoming message, it cancels the timeout in
func (sr *ChainRouter) Initialize(log logging.Logger, timeouts *timeout.Manager) { // [timeouts] associated with the request that caused the incoming message, if
// applicable.
//
// This router also fires a gossip event every [gossipFrequency] to the engine,
// notifying the engine it should gossip it's accepted set.
func (sr *ChainRouter) Initialize(log logging.Logger, timeouts *timeout.Manager, gossipFrequency time.Duration) {
sr.log = log sr.log = log
sr.chains = make(map[[32]byte]*handler.Handler) sr.chains = make(map[[32]byte]*handler.Handler)
sr.timeouts = timeouts sr.timeouts = timeouts
sr.gossiper = timer.NewRepeater(sr.Gossip, gossipFrequency)
go log.RecoverAndPanic(sr.gossiper.Dispatch)
} }
// AddChain registers the specified chain so that incoming // AddChain registers the specified chain so that incoming
@ -255,4 +266,19 @@ func (sr *ChainRouter) shutdown() {
for _, chain := range sr.chains { for _, chain := range sr.chains {
chain.Shutdown() chain.Shutdown()
} }
sr.gossiper.Stop()
}
// Gossip accepted containers
func (sr *ChainRouter) Gossip() {
sr.lock.RLock()
defer sr.lock.RUnlock()
sr.gossip()
}
func (sr *ChainRouter) gossip() {
for _, chain := range sr.chains {
chain.Gossip()
}
} }

View File

@ -20,4 +20,6 @@ type ExternalSender interface {
PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID)
Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set)
Gossip(chainID ids.ID, containerID ids.ID, container []byte)
} }

View File

@ -163,3 +163,9 @@ func (s *Sender) Chits(validatorID ids.ShortID, requestID uint32, votes ids.Set)
} }
s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes) s.sender.Chits(validatorID, s.ctx.ChainID, requestID, votes)
} }
// Gossip the provided container
func (s *Sender) Gossip(containerID ids.ID, container []byte) {
s.ctx.Log.Verbo("Gossiping %s", containerID)
s.sender.Gossip(s.ctx.ChainID, containerID, container)
}

View File

@ -38,7 +38,7 @@ func TestTimeout(t *testing.T) {
go tm.Dispatch() go tm.Dispatch()
router := router.ChainRouter{} router := router.ChainRouter{}
router.Initialize(logging.NoLog{}, &tm) router.Initialize(logging.NoLog{}, &tm, time.Hour)
sender := Sender{} sender := Sender{}
sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &router, &tm) sender.Initialize(snow.DefaultContextTest(), &ExternalSenderTest{}, &router, &tm)

View File

@ -17,7 +17,8 @@ type ExternalSenderTest struct {
CantGetAcceptedFrontier, CantAcceptedFrontier, CantGetAcceptedFrontier, CantAcceptedFrontier,
CantGetAccepted, CantAccepted, CantGetAccepted, CantAccepted,
CantGet, CantPut, CantGet, CantPut,
CantPullQuery, CantPushQuery, CantChits bool CantPullQuery, CantPushQuery, CantChits,
CantGossip bool
GetAcceptedFrontierF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) GetAcceptedFrontierF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32)
AcceptedFrontierF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) AcceptedFrontierF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set)
@ -28,6 +29,7 @@ type ExternalSenderTest struct {
PushQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) PushQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte)
PullQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) PullQueryF func(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID)
ChitsF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) ChitsF func(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set)
GossipF func(chainID ids.ID, containerID ids.ID, container []byte)
} }
// Default set the default callable value to [cant] // Default set the default callable value to [cant]
@ -41,6 +43,7 @@ func (s *ExternalSenderTest) Default(cant bool) {
s.CantPullQuery = cant s.CantPullQuery = cant
s.CantPushQuery = cant s.CantPushQuery = cant
s.CantChits = cant s.CantChits = cant
s.CantGossip = cant
} }
// GetAcceptedFrontier calls GetAcceptedFrontierF if it was initialized. If it // GetAcceptedFrontier calls GetAcceptedFrontierF if it was initialized. If it
@ -159,3 +162,16 @@ func (s *ExternalSenderTest) Chits(vdr ids.ShortID, chainID ids.ID, requestID ui
s.B.Fatalf("Unexpectedly called Chits") s.B.Fatalf("Unexpectedly called Chits")
} }
} }
// Gossip calls GossipF if it was initialized. If it wasn't initialized and this
// function shouldn't be called and testing was initialized, then testing will
// fail.
func (s *ExternalSenderTest) Gossip(chainID ids.ID, containerID ids.ID, container []byte) {
if s.GossipF != nil {
s.GossipF(chainID, containerID, container)
} else if s.CantGossip && s.T != nil {
s.T.Fatalf("Unexpectedly called Gossip")
} else if s.CantGossip && s.B != nil {
s.B.Fatalf("Unexpectedly called Gossip")
}
}

View File

@ -1510,7 +1510,7 @@ func TestBootstrapPartiallyAccepted(t *testing.T) {
go timeoutManager.Dispatch() go timeoutManager.Dispatch()
router := &router.ChainRouter{} router := &router.ChainRouter{}
router.Initialize(logging.NoLog{}, &timeoutManager) router.Initialize(logging.NoLog{}, &timeoutManager, time.Hour)
externalSender := &sender.ExternalSenderTest{T: t} externalSender := &sender.ExternalSenderTest{T: t}
externalSender.Default(true) externalSender.Default(true)

View File

@ -62,7 +62,7 @@ func ConsensusLeader(numBlocks, numTxsPerBlock int, b *testing.B) {
go timeoutManager.Dispatch() go timeoutManager.Dispatch()
router := &router.ChainRouter{} router := &router.ChainRouter{}
router.Initialize(logging.NoLog{}, &timeoutManager) router.Initialize(logging.NoLog{}, &timeoutManager, time.Hour)
// Initialize the VM // Initialize the VM
vm := &VM{} vm := &VM{}
@ -189,7 +189,7 @@ func ConsensusFollower(numBlocks, numTxsPerBlock int, b *testing.B) {
go timeoutManager.Dispatch() go timeoutManager.Dispatch()
router := &router.ChainRouter{} router := &router.ChainRouter{}
router.Initialize(logging.NoLog{}, &timeoutManager) router.Initialize(logging.NoLog{}, &timeoutManager, time.Hour)
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
wg.Add(numBlocks) wg.Add(numBlocks)