tendermint/p2p/switch.go

392 lines
10 KiB
Go
Raw Normal View History

2014-07-07 20:03:50 -07:00
package p2p
import (
"errors"
"fmt"
"net"
2015-06-17 14:55:16 -07:00
"strconv"
"time"
"github.com/tendermint/tendermint/Godeps/_workspace/src/github.com/tendermint/log15"
acm "github.com/tendermint/tendermint/account"
2015-04-01 17:30:16 -07:00
. "github.com/tendermint/tendermint/common"
"github.com/tendermint/tendermint/types"
)
2014-09-14 15:37:32 -07:00
type Reactor interface {
Service // Start, Stop
SetSwitch(*Switch)
2014-09-14 15:37:32 -07:00
GetChannels() []*ChannelDescriptor
AddPeer(peer *Peer)
RemovePeer(peer *Peer, reason interface{})
Receive(chID byte, peer *Peer, msgBytes []byte)
2014-09-14 15:37:32 -07:00
}
2015-04-08 11:35:17 -07:00
//--------------------------------------
type BaseReactor struct {
QuitService // Provides Start, Stop, .Quit
Switch *Switch
}
func NewBaseReactor(log log15.Logger, name string, impl Reactor) *BaseReactor {
return &BaseReactor{
QuitService: *NewQuitService(log, name, impl),
Switch: nil,
}
}
2015-04-08 11:35:17 -07:00
func (br *BaseReactor) SetSwitch(sw *Switch) {
br.Switch = sw
}
func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil }
func (_ *BaseReactor) AddPeer(peer *Peer) {}
func (_ *BaseReactor) RemovePeer(peer *Peer, reason interface{}) {}
func (_ *BaseReactor) Receive(chID byte, peer *Peer, msgBytes []byte) {}
2015-04-08 11:35:17 -07:00
2014-09-14 15:37:32 -07:00
//-----------------------------------------------------------------------------
/*
2014-12-23 19:31:24 -08:00
The `Switch` handles peer connections and exposes an API to receive incoming messages
on `Reactors`. Each `Reactor` is responsible for handling incoming messages of one
or more `Channels`. So while sending outgoing messages is typically performed on the peer,
incoming messages are received on the reactor.
*/
type Switch struct {
BaseService
2015-04-01 14:52:25 -07:00
listeners []Listener
reactors map[string]Reactor
2014-09-14 15:37:32 -07:00
chDescs []*ChannelDescriptor
reactorsByCh map[byte]Reactor
peers *PeerSet
dialing *CMap
nodeInfo *types.NodeInfo // our node info
nodePrivKey acm.PrivKeyEd25519 // our node privkey
}
var (
2015-07-12 10:54:34 -07:00
ErrSwitchDuplicatePeer = errors.New("Duplicate peer")
ErrSwitchMaxPeersPerIPRange = errors.New("IP range has too many peers")
)
const (
peerDialTimeoutSeconds = 3 // TODO make this configurable
2015-07-17 14:19:16 -07:00
handshakeTimeoutSeconds = 20 // TODO make this configurable
maxNumPeers = 50 // TODO make this configurable
)
func NewSwitch() *Switch {
2014-10-22 17:20:44 -07:00
sw := &Switch{
reactors: make(map[string]Reactor),
chDescs: make([]*ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
2014-09-14 15:37:32 -07:00
peers: NewPeerSet(),
dialing: NewCMap(),
nodeInfo: nil,
}
sw.BaseService = *NewBaseService(log, "P2P Switch", sw)
2014-10-22 17:20:44 -07:00
return sw
}
2015-03-25 02:36:59 -07:00
// Not goroutine safe.
func (sw *Switch) AddReactor(name string, reactor Reactor) Reactor {
// Validate the reactor.
// No two reactors can share the same channel.
reactorChannels := reactor.GetChannels()
for _, chDesc := range reactorChannels {
chID := chDesc.ID
if sw.reactorsByCh[chID] != nil {
PanicSanity(fmt.Sprintf("Channel %X has multiple reactors %v & %v", chID, sw.reactorsByCh[chID], reactor))
2014-10-22 17:20:44 -07:00
}
sw.chDescs = append(sw.chDescs, chDesc)
sw.reactorsByCh[chID] = reactor
2014-07-10 02:19:50 -07:00
}
sw.reactors[name] = reactor
reactor.SetSwitch(sw)
2015-03-25 02:36:59 -07:00
return reactor
2014-07-09 18:33:44 -07:00
}
// Not goroutine safe.
func (sw *Switch) Reactors() map[string]Reactor {
return sw.reactors
}
// Not goroutine safe.
2015-03-25 02:36:59 -07:00
func (sw *Switch) Reactor(name string) Reactor {
return sw.reactors[name]
}
// Not goroutine safe.
func (sw *Switch) AddListener(l Listener) {
sw.listeners = append(sw.listeners, l)
}
// Not goroutine safe.
func (sw *Switch) Listeners() []Listener {
return sw.listeners
}
// Not goroutine safe.
func (sw *Switch) IsListening() bool {
return len(sw.listeners) > 0
}
// Not goroutine safe.
func (sw *Switch) SetNodeInfo(nodeInfo *types.NodeInfo) {
sw.nodeInfo = nodeInfo
}
2015-07-10 08:50:58 -07:00
// Not goroutine safe.
func (sw *Switch) NodeInfo() *types.NodeInfo {
return sw.nodeInfo
}
// Not goroutine safe.
// NOTE: Overwrites sw.nodeInfo.PubKey
func (sw *Switch) SetNodePrivKey(nodePrivKey acm.PrivKeyEd25519) {
sw.nodePrivKey = nodePrivKey
if sw.nodeInfo != nil {
sw.nodeInfo.PubKey = nodePrivKey.PubKey().(acm.PubKeyEd25519)
}
}
// Switch.Start() starts all the reactors, peers, and listeners.
func (sw *Switch) OnStart() error {
2015-07-21 18:31:01 -07:00
sw.BaseService.OnStart()
// Start reactors
for _, reactor := range sw.reactors {
_, err := reactor.Start()
if err != nil {
return err
}
}
// Start peers
for _, peer := range sw.peers.List() {
sw.startInitPeer(peer)
}
// Start listeners
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
2015-04-01 14:52:25 -07:00
}
return nil
2015-04-01 14:52:25 -07:00
}
2015-07-21 18:31:01 -07:00
func (sw *Switch) OnStop() {
sw.BaseService.OnStop()
// Stop listeners
for _, listener := range sw.listeners {
listener.Stop()
}
sw.listeners = nil
// Stop peers
for _, peer := range sw.peers.List() {
peer.Stop()
}
sw.peers = NewPeerSet()
// Stop reactors
for _, reactor := range sw.reactors {
reactor.Stop()
}
2015-04-01 14:52:25 -07:00
}
// NOTE: This performs a blocking handshake before the peer is added.
// CONTRACT: Iff error is returned, peer is nil, and conn is immediately closed.
2014-10-22 17:20:44 -07:00
func (sw *Switch) AddPeerWithConnection(conn net.Conn, outbound bool) (*Peer, error) {
2015-07-17 14:19:16 -07:00
// Set deadline for handshake so we don't block forever on conn.ReadFull
conn.SetDeadline(time.Now().Add(handshakeTimeoutSeconds * time.Second))
// First, encrypt the connection.
sconn, err := MakeSecretConnection(conn, sw.nodePrivKey)
if err != nil {
conn.Close()
return nil, err
}
// Then, perform node handshake
peerNodeInfo, err := peerHandshake(sconn, sw.nodeInfo)
if err != nil {
sconn.Close()
return nil, err
}
// Check that the professed PubKey matches the sconn's.
if !peerNodeInfo.PubKey.Equals(sconn.RemotePubKey()) {
sconn.Close()
return nil, fmt.Errorf("Ignoring connection with unmatching pubkey: %v vs %v",
peerNodeInfo.PubKey, sconn.RemotePubKey())
}
2015-07-15 14:31:03 -07:00
// Avoid self
if peerNodeInfo.PubKey.Equals(sw.nodeInfo.PubKey) {
sconn.Close()
return nil, fmt.Errorf("Ignoring connection from self")
}
// Check version, chain id
2015-04-22 13:50:37 -07:00
if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
sconn.Close()
2015-04-22 13:50:37 -07:00
return nil, err
}
2015-04-22 13:50:37 -07:00
// The peerNodeInfo is not verified, so overwrite
// the IP, and the port too if we dialed out
// Everything else we just have to trust
ip, port, _ := net.SplitHostPort(sconn.RemoteAddr().String())
2015-06-17 14:55:16 -07:00
peerNodeInfo.Host = ip
if outbound {
porti, _ := strconv.Atoi(port)
peerNodeInfo.P2PPort = uint16(porti)
}
peer := newPeer(sconn, peerNodeInfo, outbound, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
// Add the peer to .peers
2015-07-13 13:17:47 -07:00
// ignore if duplicate or if we already have too many for that IP range
2015-07-12 10:54:34 -07:00
if err := sw.peers.Add(peer); err != nil {
2015-07-19 14:49:13 -07:00
log.Notice("Ignoring peer", "error", err, "peer", peer)
2015-07-20 15:02:12 -07:00
peer.Stop()
2015-07-12 10:54:34 -07:00
return nil, err
}
2015-07-17 14:19:16 -07:00
// remove deadline and start peer
conn.SetDeadline(time.Time{})
if sw.IsRunning() {
sw.startInitPeer(peer)
}
2015-07-19 14:49:13 -07:00
log.Notice("Added peer", "peer", peer)
return peer, nil
}
func (sw *Switch) startInitPeer(peer *Peer) {
peer.Start() // spawn send/recv routines
sw.addPeerToReactors(peer) // run AddPeer on each reactor
}
2014-10-22 17:20:44 -07:00
func (sw *Switch) DialPeerWithAddress(addr *NetAddress) (*Peer, error) {
2015-07-19 14:49:13 -07:00
log.Info("Dialing address", "address", addr)
2015-04-25 18:01:02 -07:00
sw.dialing.Set(addr.IP.String(), addr)
conn, err := addr.DialTimeout(peerDialTimeoutSeconds * time.Second)
2015-04-25 18:01:02 -07:00
sw.dialing.Delete(addr.IP.String())
if err != nil {
2015-07-19 14:49:13 -07:00
log.Info("Failed dialing address", "address", addr, "error", err)
return nil, err
}
2014-10-22 17:20:44 -07:00
peer, err := sw.AddPeerWithConnection(conn, true)
if err != nil {
2015-07-19 14:49:13 -07:00
log.Info("Failed adding peer", "address", addr, "conn", conn, "error", err)
return nil, err
}
2015-07-19 14:49:13 -07:00
log.Notice("Dialed and added peer", "address", addr, "peer", peer)
return peer, nil
}
2014-10-22 17:20:44 -07:00
func (sw *Switch) IsDialing(addr *NetAddress) bool {
2015-04-25 18:01:02 -07:00
return sw.dialing.Has(addr.IP.String())
2014-07-15 15:54:33 -07:00
}
// Broadcast runs a go routine for each attempted send, which will block
// trying to send for defaultSendTimeoutSeconds. Returns a channel
// which receives success values for each attempted send (false if times out)
func (sw *Switch) Broadcast(chID byte, msg interface{}) chan bool {
successChan := make(chan bool, len(sw.peers.List()))
log.Info("Broadcast", "channel", chID, "msg", msg)
2014-10-22 17:20:44 -07:00
for _, peer := range sw.peers.List() {
2015-04-17 11:08:03 -07:00
go func(peer *Peer) {
success := peer.Send(chID, msg)
successChan <- success
2015-04-17 11:08:03 -07:00
}(peer)
}
return successChan
}
2014-07-15 15:54:33 -07:00
// Returns the count of outbound/inbound and outbound-dialing peers.
2014-10-22 17:20:44 -07:00
func (sw *Switch) NumPeers() (outbound, inbound, dialing int) {
peers := sw.peers.List()
2014-07-10 22:14:23 -07:00
for _, peer := range peers {
if peer.outbound {
2014-07-15 15:54:33 -07:00
outbound++
} else {
inbound++
2014-07-10 22:14:23 -07:00
}
}
2014-10-22 17:20:44 -07:00
dialing = sw.dialing.Size()
2014-07-10 22:14:23 -07:00
return
}
2014-10-22 17:20:44 -07:00
func (sw *Switch) Peers() IPeerSet {
return sw.peers
}
// Disconnect from a peer due to external error.
// TODO: make record depending on reason.
2014-10-22 17:20:44 -07:00
func (sw *Switch) StopPeerForError(peer *Peer, reason interface{}) {
2015-07-19 14:49:13 -07:00
log.Notice("Stopping peer for error", "peer", peer, "error", reason)
2014-10-22 17:20:44 -07:00
sw.peers.Remove(peer)
peer.Stop()
2015-04-22 13:21:45 -07:00
sw.removePeerFromReactors(peer, reason)
}
// Disconnect from a peer gracefully.
// TODO: handle graceful disconnects.
2014-10-22 17:20:44 -07:00
func (sw *Switch) StopPeerGracefully(peer *Peer) {
2015-07-19 14:49:13 -07:00
log.Notice("Stopping peer gracefully")
2014-10-22 17:20:44 -07:00
sw.peers.Remove(peer)
peer.Stop()
2015-04-22 13:21:45 -07:00
sw.removePeerFromReactors(peer, nil)
2014-09-14 15:37:32 -07:00
}
func (sw *Switch) addPeerToReactors(peer *Peer) {
2015-03-25 02:36:59 -07:00
for _, reactor := range sw.reactors {
2014-09-14 15:37:32 -07:00
reactor.AddPeer(peer)
}
}
2015-04-22 13:21:45 -07:00
func (sw *Switch) removePeerFromReactors(peer *Peer, reason interface{}) {
2014-10-22 17:20:44 -07:00
for _, reactor := range sw.reactors {
2014-09-14 15:37:32 -07:00
reactor.RemovePeer(peer, reason)
}
}
2015-04-01 14:52:25 -07:00
func (sw *Switch) listenerRoutine(l Listener) {
for {
inConn, ok := <-l.Connections()
if !ok {
break
}
2015-07-12 11:10:13 -07:00
// ignore connection if we already have enough
2015-07-13 16:00:01 -07:00
if maxNumPeers <= sw.peers.Size() {
2015-07-19 14:49:13 -07:00
log.Info("Ignoring inbound connection: already have enough peers", "address", inConn.RemoteAddr().String(), "numPeers", sw.peers.Size(), "max", maxNumPeers)
2015-07-12 11:10:13 -07:00
continue
}
2015-07-13 13:17:47 -07:00
// Ignore connections from IP ranges for which we have too many
2015-07-12 10:54:34 -07:00
if sw.peers.HasMaxForIPRange(inConn) {
2015-07-19 14:49:13 -07:00
log.Info("Ignoring inbound connection: already have enough peers for that IP range", "address", inConn.RemoteAddr().String())
2015-07-12 10:54:34 -07:00
continue
}
2015-04-01 14:52:25 -07:00
// New inbound connection!
2015-07-12 11:10:13 -07:00
_, err := sw.AddPeerWithConnection(inConn, false)
2015-04-01 14:52:25 -07:00
if err != nil {
2015-07-19 14:49:13 -07:00
log.Notice("Ignoring inbound connection: error on AddPeerWithConnection", "address", inConn.RemoteAddr().String(), "error", err)
2015-04-01 14:52:25 -07:00
continue
}
2015-07-12 11:10:13 -07:00
// NOTE: We don't yet have the listening port of the
2015-04-01 14:52:25 -07:00
// remote (if they have a listener at all).
2015-07-12 11:10:13 -07:00
// The peerHandshake will handle that
2015-04-01 14:52:25 -07:00
}
// cleanup
}
//-----------------------------------------------------------------------------
type SwitchEventNewPeer struct {
Peer *Peer
}
type SwitchEventDonePeer struct {
Peer *Peer
Error interface{}
}