mirror of https://github.com/poanetwork/gecko.git
1010 lines
26 KiB
Go
1010 lines
26 KiB
Go
// (c) 2019-2020, Ava Labs, Inc. All rights reserved.
|
|
// See the file LICENSE for licensing terms.
|
|
|
|
package network
|
|
|
|
import (
|
|
"fmt"
|
|
"math"
|
|
"math/rand"
|
|
"net"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
|
|
"github.com/ava-labs/gecko/api/health"
|
|
"github.com/ava-labs/gecko/ids"
|
|
"github.com/ava-labs/gecko/snow/networking/router"
|
|
"github.com/ava-labs/gecko/snow/networking/sender"
|
|
"github.com/ava-labs/gecko/snow/triggers"
|
|
"github.com/ava-labs/gecko/snow/validators"
|
|
"github.com/ava-labs/gecko/utils"
|
|
"github.com/ava-labs/gecko/utils/formatting"
|
|
"github.com/ava-labs/gecko/utils/logging"
|
|
"github.com/ava-labs/gecko/utils/random"
|
|
"github.com/ava-labs/gecko/utils/timer"
|
|
"github.com/ava-labs/gecko/version"
|
|
)
|
|
|
|
// reasonable default values
|
|
const (
|
|
defaultInitialReconnectDelay = time.Second
|
|
defaultMaxReconnectDelay = time.Hour
|
|
DefaultMaxMessageSize uint32 = 1 << 21
|
|
defaultSendQueueSize = 1 << 10
|
|
defaultMaxNetworkPendingSendBytes = 1 << 29 // 512MB
|
|
defaultNetworkPendingSendBytesToRateLimit = defaultMaxNetworkPendingSendBytes / 4
|
|
defaultMaxClockDifference = time.Minute
|
|
defaultPeerListGossipSpacing = time.Minute
|
|
defaultPeerListGossipSize = 100
|
|
defaultPeerListStakerGossipFraction = 2
|
|
defaultGetVersionTimeout = 2 * time.Second
|
|
defaultAllowPrivateIPs = true
|
|
defaultGossipSize = 50
|
|
)
|
|
|
|
// Network defines the functionality of the networking library.
|
|
type Network interface {
|
|
// All consensus messages can be sent through this interface. Thread safety
|
|
// must be managed internally in the network.
|
|
sender.ExternalSender
|
|
|
|
// The network must be able to broadcast accepted decisions to random peers.
|
|
// Thread safety must be managed internally in the network.
|
|
triggers.Acceptor
|
|
|
|
// The network should be able to report the last time the network interacted
|
|
// with a peer
|
|
health.Heartbeater
|
|
|
|
// Should only be called once, will run until either a fatal error occurs,
|
|
// or the network is closed. Returns a non-nil error.
|
|
Dispatch() error
|
|
|
|
// Attempt to connect to this IP. Thread safety must be managed internally
|
|
// to the network. The network will never stop attempting to connect to this
|
|
// IP.
|
|
Track(ip utils.IPDesc)
|
|
|
|
// Register a new handler that is called whenever a peer is connected to or
|
|
// disconnected to. If the handler returns true, then it will never be
|
|
// called again. Thread safety must be managed internally in the network.
|
|
// The handler will initially be called with this local node's ID.
|
|
RegisterHandler(h Handler)
|
|
|
|
// Returns the description of the nodes this network is currently connected
|
|
// to externally. Thread safety must be managed internally to the network.
|
|
Peers() []PeerID
|
|
|
|
// Close this network and all existing connections it has. Thread safety
|
|
// must be managed internally to the network. Calling close multiple times
|
|
// will return a nil error.
|
|
Close() error
|
|
}
|
|
|
|
type network struct {
|
|
// The metrics that this network tracks
|
|
metrics
|
|
|
|
log logging.Logger
|
|
id ids.ShortID
|
|
ip utils.IPDesc
|
|
networkID uint32
|
|
version version.Version
|
|
parser version.Parser
|
|
listener net.Listener
|
|
dialer Dialer
|
|
serverUpgrader Upgrader
|
|
clientUpgrader Upgrader
|
|
vdrs validators.Set // set of current validators in the AVAnet
|
|
router router.Router // router must be thread safe
|
|
|
|
nodeID uint32
|
|
|
|
clock timer.Clock
|
|
lastHeartbeat int64
|
|
|
|
initialReconnectDelay time.Duration
|
|
maxReconnectDelay time.Duration
|
|
maxMessageSize uint32
|
|
sendQueueSize int
|
|
maxNetworkPendingSendBytes int
|
|
networkPendingSendBytesToRateLimit int
|
|
maxClockDifference time.Duration
|
|
peerListGossipSpacing time.Duration
|
|
peerListGossipSize int
|
|
peerListStakerGossipFraction int
|
|
getVersionTimeout time.Duration
|
|
allowPrivateIPs bool
|
|
gossipSize int
|
|
|
|
executor timer.Executor
|
|
|
|
b Builder
|
|
|
|
stateLock sync.Mutex
|
|
pendingBytes int
|
|
closed bool
|
|
disconnectedIPs map[string]struct{}
|
|
connectedIPs map[string]struct{}
|
|
retryDelay map[string]time.Duration
|
|
// TODO: bound the size of [myIPs] to avoid DoS. LRU caching would be ideal
|
|
myIPs map[string]struct{} // set of IPs that resulted in my ID.
|
|
peers map[[20]byte]*peer
|
|
handlers []Handler
|
|
}
|
|
|
|
// NewDefaultNetwork returns a new Network implementation with the provided
|
|
// parameters and some reasonable default values.
|
|
func NewDefaultNetwork(
|
|
registerer prometheus.Registerer,
|
|
log logging.Logger,
|
|
id ids.ShortID,
|
|
ip utils.IPDesc,
|
|
networkID uint32,
|
|
version version.Version,
|
|
parser version.Parser,
|
|
listener net.Listener,
|
|
dialer Dialer,
|
|
serverUpgrader,
|
|
clientUpgrader Upgrader,
|
|
vdrs validators.Set,
|
|
router router.Router,
|
|
) Network {
|
|
return NewNetwork(
|
|
registerer,
|
|
log,
|
|
id,
|
|
ip,
|
|
networkID,
|
|
version,
|
|
parser,
|
|
listener,
|
|
dialer,
|
|
serverUpgrader,
|
|
clientUpgrader,
|
|
vdrs,
|
|
router,
|
|
defaultInitialReconnectDelay,
|
|
defaultMaxReconnectDelay,
|
|
DefaultMaxMessageSize,
|
|
defaultSendQueueSize,
|
|
defaultMaxNetworkPendingSendBytes,
|
|
defaultNetworkPendingSendBytesToRateLimit,
|
|
defaultMaxClockDifference,
|
|
defaultPeerListGossipSpacing,
|
|
defaultPeerListGossipSize,
|
|
defaultPeerListStakerGossipFraction,
|
|
defaultGetVersionTimeout,
|
|
defaultAllowPrivateIPs,
|
|
defaultGossipSize,
|
|
)
|
|
}
|
|
|
|
// NewNetwork returns a new Network implementation with the provided parameters.
|
|
func NewNetwork(
|
|
registerer prometheus.Registerer,
|
|
log logging.Logger,
|
|
id ids.ShortID,
|
|
ip utils.IPDesc,
|
|
networkID uint32,
|
|
version version.Version,
|
|
parser version.Parser,
|
|
listener net.Listener,
|
|
dialer Dialer,
|
|
serverUpgrader,
|
|
clientUpgrader Upgrader,
|
|
vdrs validators.Set,
|
|
router router.Router,
|
|
initialReconnectDelay,
|
|
maxReconnectDelay time.Duration,
|
|
maxMessageSize uint32,
|
|
sendQueueSize int,
|
|
maxNetworkPendingSendBytes int,
|
|
networkPendingSendBytesToRateLimit int,
|
|
maxClockDifference time.Duration,
|
|
peerListGossipSpacing time.Duration,
|
|
peerListGossipSize int,
|
|
peerListStakerGossipFraction int,
|
|
getVersionTimeout time.Duration,
|
|
allowPrivateIPs bool,
|
|
gossipSize int,
|
|
) Network {
|
|
net := &network{
|
|
log: log,
|
|
id: id,
|
|
ip: ip,
|
|
networkID: networkID,
|
|
version: version,
|
|
parser: parser,
|
|
listener: listener,
|
|
dialer: dialer,
|
|
serverUpgrader: serverUpgrader,
|
|
clientUpgrader: clientUpgrader,
|
|
vdrs: vdrs,
|
|
router: router,
|
|
nodeID: rand.Uint32(),
|
|
initialReconnectDelay: initialReconnectDelay,
|
|
maxReconnectDelay: maxReconnectDelay,
|
|
maxMessageSize: maxMessageSize,
|
|
sendQueueSize: sendQueueSize,
|
|
maxNetworkPendingSendBytes: maxNetworkPendingSendBytes,
|
|
networkPendingSendBytesToRateLimit: networkPendingSendBytesToRateLimit,
|
|
maxClockDifference: maxClockDifference,
|
|
peerListGossipSpacing: peerListGossipSpacing,
|
|
peerListGossipSize: peerListGossipSize,
|
|
peerListStakerGossipFraction: peerListStakerGossipFraction,
|
|
getVersionTimeout: getVersionTimeout,
|
|
allowPrivateIPs: allowPrivateIPs,
|
|
gossipSize: gossipSize,
|
|
|
|
disconnectedIPs: make(map[string]struct{}),
|
|
connectedIPs: make(map[string]struct{}),
|
|
retryDelay: make(map[string]time.Duration),
|
|
myIPs: map[string]struct{}{ip.String(): {}},
|
|
peers: make(map[[20]byte]*peer),
|
|
}
|
|
net.initialize(registerer)
|
|
net.executor.Initialize()
|
|
net.heartbeat()
|
|
return net
|
|
}
|
|
|
|
// GetAcceptedFrontier implements the Sender interface.
|
|
func (n *network) GetAcceptedFrontier(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32) {
|
|
msg, err := n.b.GetAcceptedFrontier(chainID, requestID)
|
|
n.log.AssertNoError(err)
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
for _, validatorID := range validatorIDs.List() {
|
|
vID := validatorID
|
|
peer, sent := n.peers[vID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.executor.Add(func() { n.router.GetAcceptedFrontierFailed(vID, chainID, requestID) })
|
|
n.getAcceptedFrontier.numFailed.Inc()
|
|
} else {
|
|
n.getAcceptedFrontier.numSent.Inc()
|
|
}
|
|
}
|
|
}
|
|
|
|
// AcceptedFrontier implements the Sender interface.
|
|
func (n *network) AcceptedFrontier(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
|
msg, err := n.b.AcceptedFrontier(chainID, requestID, containerIDs)
|
|
if err != nil {
|
|
n.log.Error("failed to build AcceptedFrontier(%s, %d, %s): %s",
|
|
chainID,
|
|
requestID,
|
|
containerIDs,
|
|
err)
|
|
return // Packing message failed
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peer, sent := n.peers[validatorID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send AcceptedFrontier(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerIDs)
|
|
n.acceptedFrontier.numFailed.Inc()
|
|
} else {
|
|
n.acceptedFrontier.numSent.Inc()
|
|
}
|
|
}
|
|
|
|
// GetAccepted implements the Sender interface.
|
|
func (n *network) GetAccepted(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
|
msg, err := n.b.GetAccepted(chainID, requestID, containerIDs)
|
|
if err != nil {
|
|
n.log.Error("failed to build GetAccepted(%s, %d, %s): %s",
|
|
chainID,
|
|
requestID,
|
|
containerIDs,
|
|
err)
|
|
for _, validatorID := range validatorIDs.List() {
|
|
vID := validatorID
|
|
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
|
|
}
|
|
return
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
for _, validatorID := range validatorIDs.List() {
|
|
vID := validatorID
|
|
peer, sent := n.peers[vID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send GetAccepted(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerIDs)
|
|
n.executor.Add(func() { n.router.GetAcceptedFailed(vID, chainID, requestID) })
|
|
n.getAccepted.numFailed.Inc()
|
|
} else {
|
|
n.getAccepted.numSent.Inc()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Accepted implements the Sender interface.
|
|
func (n *network) Accepted(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerIDs ids.Set) {
|
|
msg, err := n.b.Accepted(chainID, requestID, containerIDs)
|
|
if err != nil {
|
|
n.log.Error("failed to build Accepted(%s, %d, %s): %s",
|
|
chainID,
|
|
requestID,
|
|
containerIDs,
|
|
err)
|
|
return // Packing message failed
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peer, sent := n.peers[validatorID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send Accepted(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerIDs)
|
|
n.accepted.numFailed.Inc()
|
|
} else {
|
|
n.accepted.numSent.Inc()
|
|
}
|
|
}
|
|
|
|
// GetAncestors implements the Sender interface.
|
|
func (n *network) GetAncestors(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
|
msg, err := n.b.GetAncestors(chainID, requestID, containerID)
|
|
if err != nil {
|
|
n.log.Error("failed to build GetAncestors message: %w", err)
|
|
return
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peer, sent := n.peers[validatorID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send GetAncestors(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerID)
|
|
n.executor.Add(func() { n.router.GetAncestorsFailed(validatorID, chainID, requestID) })
|
|
n.getAncestors.numFailed.Inc()
|
|
} else {
|
|
n.getAncestors.numSent.Inc()
|
|
}
|
|
}
|
|
|
|
// MultiPut implements the Sender interface.
|
|
func (n *network) MultiPut(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containers [][]byte) {
|
|
msg, err := n.b.MultiPut(chainID, requestID, containers)
|
|
if err != nil {
|
|
n.log.Error("failed to build MultiPut message because of container of size %d", len(containers))
|
|
return
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peer, sent := n.peers[validatorID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send MultiPut(%s, %s, %d, %d)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
len(containers))
|
|
n.multiPut.numFailed.Inc()
|
|
} else {
|
|
n.multiPut.numSent.Inc()
|
|
}
|
|
}
|
|
|
|
// Get implements the Sender interface.
|
|
func (n *network) Get(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
|
msg, err := n.b.Get(chainID, requestID, containerID)
|
|
n.log.AssertNoError(err)
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peer, sent := n.peers[validatorID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send Get(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerID)
|
|
n.executor.Add(func() { n.router.GetFailed(validatorID, chainID, requestID) })
|
|
n.get.numFailed.Inc()
|
|
} else {
|
|
n.get.numSent.Inc()
|
|
}
|
|
}
|
|
|
|
// Put implements the Sender interface.
|
|
func (n *network) Put(validatorID ids.ShortID, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
|
msg, err := n.b.Put(chainID, requestID, containerID, container)
|
|
if err != nil {
|
|
n.log.Error("failed to build Put(%s, %d, %s): %s. len(container) : %d",
|
|
chainID,
|
|
requestID,
|
|
containerID,
|
|
err,
|
|
len(container))
|
|
return
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peer, sent := n.peers[validatorID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send Put(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerID)
|
|
n.log.Verbo("container: %s", formatting.DumpBytes{Bytes: container})
|
|
n.put.numFailed.Inc()
|
|
} else {
|
|
n.put.numSent.Inc()
|
|
}
|
|
}
|
|
|
|
// PushQuery implements the Sender interface.
|
|
func (n *network) PushQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID, container []byte) {
|
|
msg, err := n.b.PushQuery(chainID, requestID, containerID, container)
|
|
if err != nil {
|
|
n.log.Error("failed to build PushQuery(%s, %d, %s): %s. len(container): %d",
|
|
chainID,
|
|
requestID,
|
|
containerID,
|
|
err,
|
|
len(container))
|
|
n.log.Verbo("container: %s", formatting.DumpBytes{Bytes: container})
|
|
for _, validatorID := range validatorIDs.List() {
|
|
vID := validatorID
|
|
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
|
|
}
|
|
return // Packing message failed
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
for _, validatorID := range validatorIDs.List() {
|
|
vID := validatorID
|
|
peer, sent := n.peers[vID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send PushQuery(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerID)
|
|
n.log.Verbo("container: %s", formatting.DumpBytes{Bytes: container})
|
|
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
|
|
n.pushQuery.numFailed.Inc()
|
|
} else {
|
|
n.pushQuery.numSent.Inc()
|
|
}
|
|
}
|
|
}
|
|
|
|
// PullQuery implements the Sender interface.
|
|
func (n *network) PullQuery(validatorIDs ids.ShortSet, chainID ids.ID, requestID uint32, containerID ids.ID) {
|
|
msg, err := n.b.PullQuery(chainID, requestID, containerID)
|
|
n.log.AssertNoError(err)
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
for _, validatorID := range validatorIDs.List() {
|
|
vID := validatorID
|
|
peer, sent := n.peers[vID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send PullQuery(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
containerID)
|
|
n.executor.Add(func() { n.router.QueryFailed(vID, chainID, requestID) })
|
|
n.pullQuery.numFailed.Inc()
|
|
} else {
|
|
n.pullQuery.numSent.Inc()
|
|
}
|
|
}
|
|
}
|
|
|
|
// Chits implements the Sender interface.
|
|
func (n *network) Chits(validatorID ids.ShortID, chainID ids.ID, requestID uint32, votes ids.Set) {
|
|
msg, err := n.b.Chits(chainID, requestID, votes)
|
|
if err != nil {
|
|
n.log.Error("failed to build Chits(%s, %d, %s): %s",
|
|
chainID,
|
|
requestID,
|
|
votes,
|
|
err)
|
|
return
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peer, sent := n.peers[validatorID.Key()]
|
|
if sent {
|
|
sent = peer.send(msg)
|
|
}
|
|
if !sent {
|
|
n.log.Debug("failed to send Chits(%s, %s, %d, %s)",
|
|
validatorID,
|
|
chainID,
|
|
requestID,
|
|
votes)
|
|
n.chits.numFailed.Inc()
|
|
} else {
|
|
n.chits.numSent.Inc()
|
|
}
|
|
}
|
|
|
|
// Gossip attempts to gossip the container to the network
|
|
func (n *network) Gossip(chainID, containerID ids.ID, container []byte) {
|
|
if err := n.gossipContainer(chainID, containerID, container); err != nil {
|
|
n.log.Debug("failed to Gossip(%s, %s): %s", chainID, containerID, err)
|
|
n.log.Verbo("container:\n%s", formatting.DumpBytes{Bytes: container})
|
|
}
|
|
}
|
|
|
|
// Accept is called after every consensus decision
|
|
func (n *network) Accept(chainID, containerID ids.ID, container []byte) error {
|
|
return n.gossipContainer(chainID, containerID, container)
|
|
}
|
|
|
|
// heartbeat registers a new heartbeat to signal liveness
|
|
func (n *network) heartbeat() { atomic.StoreInt64(&n.lastHeartbeat, n.clock.Time().Unix()) }
|
|
|
|
// GetHeartbeat returns the most recent heartbeat time
|
|
func (n *network) GetHeartbeat() int64 { return atomic.LoadInt64(&n.lastHeartbeat) }
|
|
|
|
// Dispatch starts accepting connections from other nodes attempting to connect
|
|
// to this node.
|
|
func (n *network) Dispatch() error {
|
|
go n.gossip()
|
|
for {
|
|
conn, err := n.listener.Accept()
|
|
if err != nil {
|
|
n.stateLock.Lock()
|
|
closed := n.closed
|
|
n.stateLock.Unlock()
|
|
|
|
if closed {
|
|
return err
|
|
}
|
|
n.log.Debug("error during server accept: %s", err)
|
|
continue
|
|
}
|
|
go n.upgrade(&peer{
|
|
net: n,
|
|
conn: conn,
|
|
}, n.serverUpgrader)
|
|
}
|
|
}
|
|
|
|
// RegisterHandler implements the Network interface
|
|
func (n *network) RegisterHandler(h Handler) {
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
if h.Connected(n.id) {
|
|
return
|
|
}
|
|
for _, peer := range n.peers {
|
|
if peer.connected {
|
|
if h.Connected(peer.id) {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
n.handlers = append(n.handlers, h)
|
|
}
|
|
|
|
// IPs implements the Network interface
|
|
func (n *network) Peers() []PeerID {
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
peers := []PeerID{}
|
|
for _, peer := range n.peers {
|
|
if peer.connected {
|
|
peers = append(peers, PeerID{
|
|
IP: peer.conn.RemoteAddr().String(),
|
|
PublicIP: peer.ip.String(),
|
|
ID: peer.id,
|
|
Version: peer.versionStr,
|
|
LastSent: time.Unix(atomic.LoadInt64(&peer.lastSent), 0),
|
|
LastReceived: time.Unix(atomic.LoadInt64(&peer.lastReceived), 0),
|
|
})
|
|
}
|
|
}
|
|
return peers
|
|
}
|
|
|
|
// Close implements the Network interface
|
|
func (n *network) Close() error {
|
|
n.stateLock.Lock()
|
|
if n.closed {
|
|
n.stateLock.Unlock()
|
|
return nil
|
|
}
|
|
|
|
n.closed = true
|
|
err := n.listener.Close()
|
|
|
|
peersToClose := []*peer(nil)
|
|
for _, peer := range n.peers {
|
|
peersToClose = append(peersToClose, peer)
|
|
}
|
|
n.stateLock.Unlock()
|
|
|
|
for _, peer := range peersToClose {
|
|
peer.Close() // Grabs the stateLock
|
|
}
|
|
return err
|
|
}
|
|
|
|
// Track implements the Network interface
|
|
func (n *network) Track(ip utils.IPDesc) {
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
n.track(ip)
|
|
}
|
|
|
|
// assumes the stateLock is not held.
|
|
func (n *network) gossipContainer(chainID, containerID ids.ID, container []byte) error {
|
|
msg, err := n.b.Put(chainID, math.MaxUint32, containerID, container)
|
|
if err != nil {
|
|
return fmt.Errorf("attempted to pack too large of a Put message.\nContainer length: %d", len(container))
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
allPeers := make([]*peer, 0, len(n.peers))
|
|
for _, peer := range n.peers {
|
|
allPeers = append(allPeers, peer)
|
|
}
|
|
|
|
numToGossip := n.gossipSize
|
|
if numToGossip > len(allPeers) {
|
|
numToGossip = len(allPeers)
|
|
}
|
|
|
|
sampler := random.Uniform{N: len(allPeers)}
|
|
for i := 0; i < numToGossip; i++ {
|
|
if allPeers[sampler.Sample()].send(msg) {
|
|
n.put.numSent.Inc()
|
|
} else {
|
|
n.put.numFailed.Inc()
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// assumes the stateLock is held.
|
|
func (n *network) track(ip utils.IPDesc) {
|
|
if n.closed {
|
|
return
|
|
}
|
|
|
|
str := ip.String()
|
|
if _, ok := n.disconnectedIPs[str]; ok {
|
|
return
|
|
}
|
|
if _, ok := n.connectedIPs[str]; ok {
|
|
return
|
|
}
|
|
if _, ok := n.myIPs[str]; ok {
|
|
return
|
|
}
|
|
n.disconnectedIPs[str] = struct{}{}
|
|
|
|
go n.connectTo(ip)
|
|
}
|
|
|
|
// assumes the stateLock is not held. Only returns after the network is closed.
|
|
func (n *network) gossip() {
|
|
t := time.NewTicker(n.peerListGossipSpacing)
|
|
defer t.Stop()
|
|
|
|
for range t.C {
|
|
ips := n.validatorIPs()
|
|
if len(ips) == 0 {
|
|
n.log.Debug("skipping validator gossiping as no public validators are connected")
|
|
continue
|
|
}
|
|
msg, err := n.b.PeerList(ips)
|
|
if err != nil {
|
|
n.log.Error("failed to build peer list to gossip: %s. len(ips): %d",
|
|
err,
|
|
len(ips))
|
|
continue
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
if n.closed {
|
|
n.stateLock.Unlock()
|
|
return
|
|
}
|
|
|
|
stakers := []*peer(nil)
|
|
nonStakers := []*peer(nil)
|
|
for _, peer := range n.peers {
|
|
if n.vdrs.Contains(peer.id) {
|
|
stakers = append(stakers, peer)
|
|
} else {
|
|
nonStakers = append(nonStakers, peer)
|
|
}
|
|
}
|
|
|
|
numStakersToSend := (n.peerListGossipSize + n.peerListStakerGossipFraction - 1) / n.peerListStakerGossipFraction
|
|
if len(stakers) < numStakersToSend {
|
|
numStakersToSend = len(stakers)
|
|
}
|
|
numNonStakersToSend := n.peerListGossipSize - numStakersToSend
|
|
if len(nonStakers) < numNonStakersToSend {
|
|
numNonStakersToSend = len(nonStakers)
|
|
}
|
|
|
|
sampler := random.Uniform{N: len(stakers)}
|
|
for i := 0; i < numStakersToSend; i++ {
|
|
stakers[sampler.Sample()].send(msg)
|
|
}
|
|
sampler.N = len(nonStakers)
|
|
sampler.Replace()
|
|
for i := 0; i < numNonStakersToSend; i++ {
|
|
nonStakers[sampler.Sample()].send(msg)
|
|
}
|
|
n.stateLock.Unlock()
|
|
}
|
|
}
|
|
|
|
// assumes the stateLock is not held. Only returns if the ip is connected to or
|
|
// the network is closed
|
|
func (n *network) connectTo(ip utils.IPDesc) {
|
|
str := ip.String()
|
|
n.stateLock.Lock()
|
|
delay := n.retryDelay[str]
|
|
n.stateLock.Unlock()
|
|
|
|
for {
|
|
time.Sleep(delay)
|
|
|
|
if delay == 0 {
|
|
delay = n.initialReconnectDelay
|
|
}
|
|
|
|
delay = time.Duration(float64(delay) * (1 + rand.Float64()))
|
|
if delay > n.maxReconnectDelay {
|
|
// set the timeout to [.75, 1) * maxReconnectDelay
|
|
delay = time.Duration(float64(n.maxReconnectDelay) * (3 + rand.Float64()) / 4)
|
|
}
|
|
|
|
n.stateLock.Lock()
|
|
_, isDisconnected := n.disconnectedIPs[str]
|
|
_, isConnected := n.connectedIPs[str]
|
|
_, isMyself := n.myIPs[str]
|
|
closed := n.closed
|
|
|
|
if !isDisconnected || isConnected || isMyself || closed {
|
|
// If the IP was discovered by the peer connecting to us, we don't
|
|
// need to attempt to connect anymore
|
|
|
|
// If the IP was discovered to be our IP address, we don't need to
|
|
// attempt to connect anymore
|
|
|
|
// If the network was closed, we should stop attempting to connect
|
|
// to the peer
|
|
|
|
n.stateLock.Unlock()
|
|
return
|
|
}
|
|
n.retryDelay[str] = delay
|
|
n.stateLock.Unlock()
|
|
|
|
err := n.attemptConnect(ip)
|
|
if err == nil {
|
|
return
|
|
}
|
|
n.log.Verbo("error attempting to connect to %s: %s. Reattempting in %s",
|
|
ip, err, delay)
|
|
}
|
|
}
|
|
|
|
// assumes the stateLock is not held. Returns nil if a connection was able to be
|
|
// established, or the network is closed.
|
|
func (n *network) attemptConnect(ip utils.IPDesc) error {
|
|
n.log.Verbo("attempting to connect to %s", ip)
|
|
|
|
conn, err := n.dialer.Dial(ip)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return n.upgrade(&peer{
|
|
net: n,
|
|
ip: ip,
|
|
conn: conn,
|
|
}, n.clientUpgrader)
|
|
}
|
|
|
|
// assumes the stateLock is not held. Returns an error if the peer's connection
|
|
// wasn't able to be upgraded.
|
|
func (n *network) upgrade(p *peer, upgrader Upgrader) error {
|
|
id, conn, err := upgrader.Upgrade(p.conn)
|
|
if err != nil {
|
|
n.log.Verbo("failed to upgrade connection with %s", err)
|
|
return err
|
|
}
|
|
p.sender = make(chan []byte, n.sendQueueSize)
|
|
p.id = id
|
|
p.conn = conn
|
|
|
|
key := id.Key()
|
|
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
if n.closed {
|
|
p.conn.Close()
|
|
return nil
|
|
}
|
|
|
|
// if this connection is myself, then I should delete the connection and
|
|
// mark the IP as one of mine.
|
|
if id.Equals(n.id) {
|
|
if !p.ip.IsZero() {
|
|
// if n.ip is less useful than p.ip set it to this IP
|
|
if n.ip.IsZero() {
|
|
n.log.Info("setting my ip to %s because I was able to connect to myself through this channel",
|
|
p.ip)
|
|
n.ip = p.ip
|
|
}
|
|
str := p.ip.String()
|
|
delete(n.disconnectedIPs, str)
|
|
delete(n.retryDelay, str)
|
|
n.myIPs[str] = struct{}{}
|
|
}
|
|
p.conn.Close()
|
|
return nil
|
|
}
|
|
|
|
if _, ok := n.peers[key]; ok {
|
|
if !p.ip.IsZero() {
|
|
str := p.ip.String()
|
|
delete(n.disconnectedIPs, str)
|
|
delete(n.retryDelay, str)
|
|
}
|
|
p.conn.Close()
|
|
return nil
|
|
}
|
|
|
|
n.peers[key] = p
|
|
n.numPeers.Set(float64(len(n.peers)))
|
|
p.Start()
|
|
return nil
|
|
}
|
|
|
|
// assumes the stateLock is not held. Returns the ips of connections that have
|
|
// valid IPs that are marked as validators.
|
|
func (n *network) validatorIPs() []utils.IPDesc {
|
|
n.stateLock.Lock()
|
|
defer n.stateLock.Unlock()
|
|
|
|
ips := []utils.IPDesc(nil)
|
|
for _, peer := range n.peers {
|
|
if peer.connected &&
|
|
!peer.ip.IsZero() &&
|
|
n.vdrs.Contains(peer.id) {
|
|
ips = append(ips, peer.ip)
|
|
}
|
|
}
|
|
return ips
|
|
}
|
|
|
|
// assumes the stateLock is held when called
|
|
// should only be called after the peer is marked as connected. Should not be
|
|
// called after disconnected is called with this peer.
|
|
func (n *network) connected(p *peer) {
|
|
n.log.Debug("connected to %s at %s", p.id, p.ip)
|
|
if !p.ip.IsZero() {
|
|
str := p.ip.String()
|
|
|
|
delete(n.disconnectedIPs, str)
|
|
delete(n.retryDelay, str)
|
|
n.connectedIPs[str] = struct{}{}
|
|
}
|
|
|
|
for i := 0; i < len(n.handlers); {
|
|
if n.handlers[i].Connected(p.id) {
|
|
newLen := len(n.handlers) - 1
|
|
n.handlers[i] = n.handlers[newLen] // remove the current handler
|
|
n.handlers = n.handlers[:newLen]
|
|
} else {
|
|
i++
|
|
}
|
|
}
|
|
}
|
|
|
|
// assumes the stateLock is held when called
|
|
// should only be called after the peer is marked as connected.
|
|
func (n *network) disconnected(p *peer) {
|
|
n.log.Debug("disconnected from %s at %s", p.id, p.ip)
|
|
key := p.id.Key()
|
|
delete(n.peers, key)
|
|
n.numPeers.Set(float64(len(n.peers)))
|
|
|
|
if !p.ip.IsZero() {
|
|
str := p.ip.String()
|
|
|
|
delete(n.disconnectedIPs, str)
|
|
delete(n.connectedIPs, str)
|
|
|
|
n.track(p.ip)
|
|
}
|
|
|
|
if p.connected {
|
|
for i := 0; i < len(n.handlers); {
|
|
if n.handlers[i].Disconnected(p.id) {
|
|
newLen := len(n.handlers) - 1
|
|
n.handlers[i] = n.handlers[newLen] // remove the current handler
|
|
n.handlers = n.handlers[:newLen]
|
|
} else {
|
|
i++
|
|
}
|
|
}
|
|
}
|
|
}
|