Commented network and added onAccept gossiping

This commit is contained in:
StephenButtolph 2020-05-22 16:04:48 -04:00
parent 3892a62f28
commit b3baae5be4
6 changed files with 402 additions and 297 deletions

View File

@ -21,6 +21,7 @@ var (
type Codec struct{}
// Pack attempts to pack a map of fields into a message.
// The first byte of the message is the opcode of the message.
func (Codec) Pack(op uint8, fields map[Field]interface{}) (Msg, error) {
message, ok := Messages[op]
if !ok {
@ -45,6 +46,7 @@ func (Codec) Pack(op uint8, fields map[Field]interface{}) (Msg, error) {
}
// Parse attempts to convert bytes into a message.
// The first byte of the message is the opcode of the message.
func (Codec) Parse(b []byte) (Msg, error) {
p := wrappers.Packer{Bytes: b}
op := p.UnpackByte()

View File

@ -9,7 +9,7 @@ import (
"github.com/ava-labs/gecko/utils"
)
// Dialer ...
// Dialer attempts to create a connection with the provided IP/port pair
type Dialer interface {
Dial(utils.IPDesc) (net.Conn, error)
}
@ -18,7 +18,8 @@ type dialer struct {
network string
}
// NewDialer ...
// NewDialer returns a new Dialer that calls `net.Dial` with the provided
// network.
func NewDialer(network string) Dialer { return &dialer{network: network} }
func (d *dialer) Dial(ip utils.IPDesc) (net.Conn, error) { return net.Dial(d.network, ip.String()) }

View File

@ -6,7 +6,7 @@ package network
import "github.com/ava-labs/gecko/ids"
// Handler represents a handler that is called when a connection is marked as
// connected and disconnected
// connected or disconnected
type Handler interface {
// returns true if the handler should be removed
Connected(id ids.ShortID) bool

View File

@ -13,6 +13,7 @@ import (
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/networking/sender"
"github.com/ava-labs/gecko/snow/triggers"
"github.com/ava-labs/gecko/snow/validators"
"github.com/ava-labs/gecko/utils"
"github.com/ava-labs/gecko/utils/logging"
@ -35,22 +36,38 @@ const (
defaultGossipSize = 50
)
// Network ...
// Network defines the functionality of the networking library.
type Network interface {
// All consensus messages can be sent through this interface. Thread safety
// must be managed internally in the network.
sender.ExternalSender
// Returns non-nil error
// The network must be able to broadcast accepted decisions to random peers.
// Thread safety must be managed internally in the network.
triggers.Acceptor
// Should only be called once, will run until either a fatal error occurs,
// or the network is closed. Returns a non-nil error.
Dispatch() error
// Must be thread safe. Will never stop attempting to connect to this IP
// Attempt to connect to this IP. Thread safety must be managed internally
// to the network. The network will never stop attempting to connect to this
// IP.
Track(ip utils.IPDesc)
// Calls Connected on all the currently connected peers
// Register a new handler that is called whenever a peer is connected to or
// disconnected to. If the handler returns true, then it will never be
// called again. Thread safety must be managed internally in the network.
// The handler will initially be called with this local node's ID.
RegisterHandler(h Handler)
// Returns the IPs of nodes this network is currently connected to
// externally. Thread safety must be managed internally to the network.
IPs() []utils.IPDesc
// Must be thread safe
// Close this network and all existing connections it has. Thread safety
// must be managed internally to the network. Calling close multiple times
// will return a nil error.
Close() error
}
@ -66,7 +83,7 @@ type network struct {
serverUpgrader Upgrader
clientUpgrader Upgrader
vdrs validators.Set // set of current validators in the AVAnet
router router.Router
router router.Router // router must be thread safe
clock timer.Clock
@ -94,7 +111,8 @@ type network struct {
handlers []Handler
}
// NewDefaultNetwork ...
// NewDefaultNetwork returns a new Network implementation with the provided
// parameters and some reasonable default values.
func NewDefaultNetwork(
log logging.Logger,
id ids.ShortID,
@ -136,7 +154,7 @@ func NewDefaultNetwork(
)
}
// NewNetwork ...
// NewNetwork returns a new Network implementation with the provided parameters.
func NewNetwork(
log logging.Logger,
id ids.ShortID,
@ -195,6 +213,214 @@ func NewNetwork(
return net
}
// GetAcceptedFrontier implements the Sender interface.
func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) {
msg, err := n.b.GetAcceptedFrontier(chainID, requestID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
}
}
}
// AcceptedFrontier implements the Sender interface.
func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.AcceptedFrontier(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an AcceptedFrontier message.\nNumber of containerIDs: %d",
containerIDs.Len())
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send an AcceptedFrontier message to: %s", validatorID)
}
}
// GetAccepted implements the Sender interface.
func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.GetAccepted(chainID, requestID, containerIDs)
if err != nil {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
}
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
}
}
}
// Accepted implements the Sender interface.
func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.Accepted(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an Accepted message.\nNumber of containerIDs: %d",
containerIDs.Len())
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send an Accepted message to: %s", validatorID)
}
}
// Get implements the Sender interface.
func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.Get(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a Get message to: %s", validatorID)
}
}
// Put implements the Sender interface.
func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.Put(chainID, requestID, containerID, container)
if err != nil {
n.log.Error("failed to build Put message because of container of size %d", len(container))
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a Put message to: %s", validatorID)
}
}
// PushQuery implements the Sender interface.
func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.PushQuery(chainID, requestID, containerID, container)
if err != nil {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
n.log.Error("attempted to pack too large of a PushQuery message.\nContainer length: %d", len(container))
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed sending a PushQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
}
}
// PullQuery implements the Sender interface.
func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.PullQuery(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed sending a PullQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
}
}
// Chits implements the Sender interface.
func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
msg, err := n.b.Chits(chainID, requestID, votes)
if err != nil {
n.log.Error("failed to build Chits message because of %d votes", votes.Len())
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.send(msg)
}
if !sent {
n.log.Debug("failed to send a Chits message to: %s", validatorID)
}
}
// Gossip attempts to gossip the container to the network
func (n *network) Gossip(chainID, containerID ids.ID, container []byte) {
if err := n.gossipContainer(chainID, containerID, container); err != nil {
n.log.Error("error gossiping container %s to %s: %s", containerID, chainID, err)
}
}
// Accept is called after every consensus decision
func (n *network) Accept(chainID, containerID ids.ID, container []byte) error {
return n.gossipContainer(chainID, containerID, container)
}
// Dispatch starts accepting connections from other nodes attempting to connect
// to this node.
func (n *network) Dispatch() error {
go n.gossip()
for {
@ -209,6 +435,7 @@ func (n *network) Dispatch() error {
}
}
// RegisterHandler implements the Network interface
func (n *network) RegisterHandler(h Handler) {
n.stateLock.Lock()
defer n.stateLock.Unlock()
@ -226,6 +453,7 @@ func (n *network) RegisterHandler(h Handler) {
n.handlers = append(n.handlers, h)
}
// IPs implements the Network interface
func (n *network) IPs() []utils.IPDesc {
n.stateLock.Lock()
defer n.stateLock.Unlock()
@ -239,6 +467,78 @@ func (n *network) IPs() []utils.IPDesc {
return ips
}
// Close implements the Network interface
func (n *network) Close() error {
n.stateLock.Lock()
if n.closed {
n.stateLock.Unlock()
return nil
}
n.closed = true
err := n.listener.Close()
n.stateLock.Unlock()
for _, peer := range n.peers {
peer.Close() // Grabs the stateLock
}
return err
}
// Track implements the Network interface
func (n *network) Track(ip utils.IPDesc) {
n.stateLock.Lock()
defer n.stateLock.Unlock()
n.track(ip)
}
// assumes the stateLock is not held.
func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte) error {
msg, err := n.b.Put(chainID, math.MaxUint32, containerID, container)
if err != nil {
return fmt.Errorf("attempted to pack too large of a Put message.\nContainer length: %d", len(container))
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
allPeers := make([]*peer, 0, len(n.peers))
for _, peer := range n.peers {
allPeers = append(allPeers, peer)
}
numToGossip := n.gossipSize
if numToGossip > len(allPeers) {
numToGossip = len(allPeers)
}
sampler := random.Uniform{N: len(allPeers)}
for i := 0; i < numToGossip; i++ {
allPeers[sampler.Sample()].send(msg)
}
return nil
}
// assumes the stateLock is held.
func (n *network) track(ip utils.IPDesc) {
if n.closed {
return
}
str := ip.String()
if _, ok := n.disconnectedIPs[str]; ok {
return
}
if _, ok := n.connectedIPs[str]; ok {
return
}
n.disconnectedIPs[str] = struct{}{}
go n.connectTo(ip)
}
// assumes the stateLock is not held. Only returns after the network is closed.
func (n *network) gossip() {
t := time.NewTicker(n.peerListGossipSpacing)
defer t.Stop()
@ -282,58 +582,19 @@ func (n *network) gossip() {
sampler := random.Uniform{N: len(stakers)}
for i := 0; i < numStakersToSend; i++ {
stakers[sampler.Sample()].Send(msg)
stakers[sampler.Sample()].send(msg)
}
sampler.N = len(nonStakers)
sampler.Replace()
for i := 0; i < numNonStakersToSend; i++ {
nonStakers[sampler.Sample()].Send(msg)
nonStakers[sampler.Sample()].send(msg)
}
n.stateLock.Unlock()
}
}
func (n *network) Close() error {
n.stateLock.Lock()
if n.closed {
n.stateLock.Unlock()
return nil
}
n.closed = true
err := n.listener.Close()
n.stateLock.Unlock()
for _, peer := range n.peers {
peer.Close() // Grabs the stateLock
}
return err
}
func (n *network) Track(ip utils.IPDesc) {
n.stateLock.Lock()
defer n.stateLock.Unlock()
n.track(ip)
}
func (n *network) track(ip utils.IPDesc) {
if n.closed {
return
}
str := ip.String()
if _, ok := n.disconnectedIPs[str]; ok {
return
}
if _, ok := n.connectedIPs[str]; ok {
return
}
n.disconnectedIPs[str] = struct{}{}
go n.connectTo(ip)
}
// assumes the stateLock is not held. Only returns if the ip is connected to or
// the network is closed
func (n *network) connectTo(ip utils.IPDesc) {
str := ip.String()
delay := n.initialReconnectDelay
@ -341,11 +602,15 @@ func (n *network) connectTo(ip utils.IPDesc) {
n.stateLock.Lock()
_, isDisconnected := n.disconnectedIPs[str]
_, isConnected := n.connectedIPs[str]
closed := n.closed
n.stateLock.Unlock()
if !isDisconnected || isConnected {
if !isDisconnected || isConnected || closed {
// If the IP was discovered by that peer connecting to us, we don't
// need to attempt to connect anymore
// If the network was closed, we should stop attempting to connect
// to the peer
return
}
@ -353,7 +618,8 @@ func (n *network) connectTo(ip utils.IPDesc) {
if err == nil {
return
}
// TODO: Log error
n.log.Verbo("error attempting to connect to %s: %s. Reattempting in %s",
ip, err, delay)
time.Sleep(delay)
delay *= 2
@ -363,6 +629,8 @@ func (n *network) connectTo(ip utils.IPDesc) {
}
}
// assumes the stateLock is not held. Returns nil if a connection was able to be
// established, or the network is closed.
func (n *network) attemptConnect(ip utils.IPDesc) error {
n.log.Verbo("attempting to connect to %s", ip)
@ -377,6 +645,8 @@ func (n *network) attemptConnect(ip utils.IPDesc) error {
}, n.clientUpgrader)
}
// assumes the stateLock is not held. Returns an error if the peer's connection
// wasn't able to be upgraded.
func (n *network) upgrade(p *peer, upgrader Upgrader) error {
id, conn, err := upgrader.Upgrade(p.conn)
if err != nil {
@ -396,8 +666,7 @@ func (n *network) upgrade(p *peer, upgrader Upgrader) error {
return nil
}
_, ok := n.peers[key]
if ok {
if _, ok := n.peers[key]; ok {
p.conn.Close()
return nil
}
@ -407,6 +676,8 @@ func (n *network) upgrade(p *peer, upgrader Upgrader) error {
return nil
}
// assumes the stateLock is not held. Returns the ips of connections that have
// valid IPs that are marked as validators.
func (n *network) validatorIPs() []utils.IPDesc {
n.stateLock.Lock()
defer n.stateLock.Unlock()
@ -422,7 +693,9 @@ func (n *network) validatorIPs() []utils.IPDesc {
return ips
}
// stateLock is held when called
// assumes the stateLock is held when called
// should only be called after the peer is marked as connected. Should not be
// called after disconnected is called with this peer.
func (n *network) connected(p *peer) {
n.log.Debug("connected to %s at %s", p.id, p.ip)
if !p.ip.IsZero() {
@ -443,11 +716,12 @@ func (n *network) connected(p *peer) {
}
}
// stateLock is held when called
// assumes the stateLock is held when called
// should only be called after the peer is marked as connected.
func (n *network) disconnected(p *peer) {
n.log.Debug("disconnected from %s at %s", p.id, p.ip)
key := p.id.Key()
delete(p.net.peers, key)
delete(n.peers, key)
if !p.ip.IsZero() {
str := p.ip.String()
@ -455,7 +729,7 @@ func (n *network) disconnected(p *peer) {
delete(n.disconnectedIPs, str)
delete(n.connectedIPs, str)
p.net.track(p.ip)
n.track(p.ip)
}
if p.connected {
@ -470,235 +744,3 @@ func (n *network) disconnected(p *peer) {
}
}
}
// Accept is called after every consensus decision
func (n *network) Accept(chainID, containerID ids.ID, container []byte) error {
return n.gossipContainer(chainID, containerID, container)
}
// GetAcceptedFrontier implements the Sender interface.
func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) {
msg, err := n.b.GetAcceptedFrontier(chainID, requestID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
}
}
}
// AcceptedFrontier implements the Sender interface.
func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.AcceptedFrontier(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an AcceptedFrontier message.\nNumber of containerIDs: %d",
containerIDs.Len())
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send an AcceptedFrontier message to: %s", validatorID)
}
}
// GetAccepted implements the Sender interface.
func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.GetAccepted(chainID, requestID, containerIDs)
if err != nil {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
}
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
}
}
}
// Accepted implements the Sender interface.
func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
msg, err := n.b.Accepted(chainID, requestID, containerIDs)
if err != nil {
n.log.Error("attempted to pack too large of an Accepted message.\nNumber of containerIDs: %d",
containerIDs.Len())
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send an Accepted message to: %s", validatorID)
}
}
// Get implements the Sender interface.
func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.Get(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send a Get message to: %s", validatorID)
}
}
// Put implements the Sender interface.
func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.Put(chainID, requestID, containerID, container)
if err != nil {
n.log.Error("failed to build Put message because of container of size %d", len(container))
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send a Put message to: %s", validatorID)
}
}
// PushQuery implements the Sender interface.
func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
msg, err := n.b.PushQuery(chainID, requestID, containerID, container)
if err != nil {
for _, validatorID := range validatorIDs.List() {
vID := validatorID
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
n.log.Error("attempted to pack too large of a PushQuery message.\nContainer length: %d", len(container))
return // Packing message failed
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed sending a PushQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
}
}
// PullQuery implements the Sender interface.
func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) {
msg, err := n.b.PullQuery(chainID, requestID, containerID)
n.log.AssertNoError(err)
n.stateLock.Lock()
defer n.stateLock.Unlock()
for _, validatorID := range validatorIDs.List() {
vID := validatorID
peer, sent := n.peers[vID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed sending a PullQuery message to: %s", vID)
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
}
}
}
// Chits implements the Sender interface.
func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
msg, err := n.b.Chits(chainID, requestID, votes)
if err != nil {
n.log.Error("failed to build Chits message because of %d votes", votes.Len())
return
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
peer, sent := n.peers[validatorID.Key()]
if sent {
sent = peer.Send(msg)
}
if !sent {
n.log.Debug("failed to send a Chits message to: %s", validatorID)
}
}
// Gossip attempts to gossip the container to the network
func (n *network) Gossip(chainID, containerID ids.ID, container []byte) {
if err := n.gossipContainer(chainID, containerID, container); err != nil {
n.log.Error("error gossiping container %s to %s: %s", containerID, chainID, err)
}
}
func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte) error {
msg, err := n.b.Put(chainID, math.MaxUint32, containerID, container)
if err != nil {
return fmt.Errorf("attempted to pack too large of a Put message.\nContainer length: %d", len(container))
}
n.stateLock.Lock()
defer n.stateLock.Unlock()
allPeers := make([]*peer, 0, len(n.peers))
for _, peer := range n.peers {
allPeers = append(allPeers, peer)
}
numToGossip := n.gossipSize
if numToGossip > len(allPeers) {
numToGossip = len(allPeers)
}
sampler := random.Uniform{N: len(allPeers)}
for i := 0; i < numToGossip; i++ {
allPeers[sampler.Sample()].Send(msg)
}
return nil
}

View File

@ -45,14 +45,17 @@ type peer struct {
conn net.Conn
}
// assume the stateLock is held
func (p *peer) Start() {
go p.ReadMessages()
go p.WriteMessages()
// Initially send the version to the peer
p.Version()
go p.Version()
go p.requestVersion()
}
// request the version from the peer until we get the version from them
func (p *peer) requestVersion() {
t := time.NewTicker(p.net.getVersionTimeout)
defer t.Stop()
@ -70,6 +73,7 @@ func (p *peer) requestVersion() {
}
}
// attempt to read messages from the peer
func (p *peer) ReadMessages() {
defer p.Close()
@ -135,6 +139,7 @@ func (p *peer) ReadMessages() {
}
}
// attempt to write messages to the peer
func (p *peer) WriteMessages() {
defer p.Close()
@ -149,7 +154,7 @@ func (p *peer) WriteMessages() {
for len(msg) > 0 {
written, err := p.conn.Write(msg)
if err != nil {
// TODO: log error
p.net.log.Verbo("error writing to %s at %s due to: %s", p.id, p.ip, err)
return
}
msg = msg[written:]
@ -157,7 +162,21 @@ func (p *peer) WriteMessages() {
}
}
// send assumes that close has not been called on this peer yet
func (p *peer) Send(msg Msg) bool {
p.net.stateLock.Lock()
defer p.net.stateLock.Unlock()
return p.send(msg)
}
// send assumes that close has not been called on this peer yet and that the
// stateLock is held.
func (p *peer) send(msg Msg) bool {
if p.closed {
p.net.log.Debug("dropping message to %s due to a closed connection", p.id)
return false
}
select {
case p.sender <- msg.Bytes():
return true
@ -167,6 +186,7 @@ func (p *peer) Send(msg Msg) bool {
}
}
// assumes the stateLock is not held
func (p *peer) handle(msg Msg) {
op := msg.Op()
switch op {
@ -184,7 +204,7 @@ func (p *peer) handle(msg Msg) {
p.GetVersion()
return
}
switch msg.Op() {
switch op {
case GetPeerList:
p.getPeerList(msg)
case PeerList:
@ -212,8 +232,10 @@ func (p *peer) handle(msg Msg) {
}
}
// assumes the stateLock is not held
func (p *peer) Close() { p.once.Do(p.close) }
// assumes only `peer.Close` calls this
func (p *peer) close() {
p.net.stateLock.Lock()
defer p.net.stateLock.Unlock()
@ -224,11 +246,14 @@ func (p *peer) close() {
p.net.disconnected(p)
}
// assumes the stateLock is not held
func (p *peer) GetVersion() {
msg, err := p.net.b.GetVersion()
p.net.log.AssertNoError(err)
p.Send(msg)
}
// assumes the stateLock is not held
func (p *peer) Version() {
msg, err := p.net.b.Version(
p.net.networkID,
@ -239,11 +264,15 @@ func (p *peer) Version() {
p.net.log.AssertNoError(err)
p.Send(msg)
}
// assumes the stateLock is not held
func (p *peer) GetPeerList() {
msg, err := p.net.b.GetPeerList()
p.net.log.AssertNoError(err)
p.Send(msg)
}
// assumes the stateLock is not held
func (p *peer) PeerList(peers []utils.IPDesc) {
msg, err := p.net.b.PeerList(peers)
if err != nil {
@ -254,7 +283,10 @@ func (p *peer) PeerList(peers []utils.IPDesc) {
return
}
// assumes the stateLock is not held
func (p *peer) getVersion(_ Msg) { p.Version() }
// assumes the stateLock is not held
func (p *peer) version(msg Msg) {
if p.connected {
p.net.log.Verbo("dropping duplicated version message from %s", p.id)
@ -340,6 +372,8 @@ func (p *peer) version(msg Msg) {
p.net.stateLock.Lock()
defer p.net.stateLock.Unlock()
// the network connected function can only be called if disconnected wasn't
// already called
if p.closed {
return
}
@ -347,6 +381,8 @@ func (p *peer) version(msg Msg) {
p.connected = true
p.net.connected(p)
}
// assumes the stateLock is not held
func (p *peer) SendPeerList() {
ips := p.net.validatorIPs()
reply, err := p.net.b.PeerList(ips)
@ -356,7 +392,11 @@ func (p *peer) SendPeerList() {
}
p.Send(reply)
}
// assumes the stateLock is not held
func (p *peer) getPeerList(_ Msg) { p.SendPeerList() }
// assumes the stateLock is not held
func (p *peer) peerList(msg Msg) {
ips := msg.Get(Peers).([]utils.IPDesc)
@ -369,6 +409,8 @@ func (p *peer) peerList(msg Msg) {
}
}
}
// assumes the stateLock is not held
func (p *peer) getAcceptedFrontier(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -376,6 +418,8 @@ func (p *peer) getAcceptedFrontier(msg Msg) {
p.net.router.GetAcceptedFrontier(p.id, chainID, requestID)
}
// assumes the stateLock is not held
func (p *peer) acceptedFrontier(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -393,6 +437,8 @@ func (p *peer) acceptedFrontier(msg Msg) {
p.net.router.AcceptedFrontier(p.id, chainID, requestID, containerIDs)
}
// assumes the stateLock is not held
func (p *peer) getAccepted(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -410,6 +456,8 @@ func (p *peer) getAccepted(msg Msg) {
p.net.router.GetAccepted(p.id, chainID, requestID, containerIDs)
}
// assumes the stateLock is not held
func (p *peer) accepted(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -427,6 +475,8 @@ func (p *peer) accepted(msg Msg) {
p.net.router.Accepted(p.id, chainID, requestID, containerIDs)
}
// assumes the stateLock is not held
func (p *peer) get(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -436,6 +486,8 @@ func (p *peer) get(msg Msg) {
p.net.router.Get(p.id, chainID, requestID, containerID)
}
// assumes the stateLock is not held
func (p *peer) put(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -446,6 +498,8 @@ func (p *peer) put(msg Msg) {
p.net.router.Put(p.id, chainID, requestID, containerID, container)
}
// assumes the stateLock is not held
func (p *peer) pushQuery(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -456,6 +510,8 @@ func (p *peer) pushQuery(msg Msg) {
p.net.router.PushQuery(p.id, chainID, requestID, containerID, container)
}
// assumes the stateLock is not held
func (p *peer) pullQuery(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)
@ -465,6 +521,8 @@ func (p *peer) pullQuery(msg Msg) {
p.net.router.PullQuery(p.id, chainID, requestID, containerID)
}
// assumes the stateLock is not held
func (p *peer) chits(msg Msg) {
chainID, err := ids.ToID(msg.Get(ChainID).([]byte))
p.net.log.AssertNoError(err)

View File

@ -313,6 +313,8 @@ func (n *Node) initEventDispatcher() {
n.ConsensusDispatcher = &triggers.EventDispatcher{}
n.ConsensusDispatcher.Initialize(n.Log)
n.Log.AssertNoError(n.ConsensusDispatcher.Register("gossip", n.Net))
}
// Initializes the Platform chain.