p2p: Integrate new Transport

We are swapping the exisiting listener implementation with the newly
introduced Transport and its default implementation MultiplexTransport,
removing a large chunk of old connection setup and handling scattered
over the Peer and Switch code. The Switch requires a Transport now and
handles externally passed Peer filters.
This commit is contained in:
Alexander Simmerl 2018-09-18 22:14:40 +02:00
parent be5d68ea4f
commit bdd01310a0
16 changed files with 760 additions and 905 deletions

View File

@ -42,7 +42,7 @@ func newBlockchainReactor(logger log.Logger, maxBlockHeight int64) *BlockchainRe
bcReactor.SetLogger(logger.With("module", "blockchain"))
// Next: we need to set a switch in order for peers to be added in
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig())
bcReactor.Switch = p2p.NewSwitch(cfg.DefaultP2PConfig(), nil)
// Lastly: let's add some blocks in
for blockHeight := int64(1); blockHeight <= maxBlockHeight; blockHeight++ {

View File

@ -39,7 +39,13 @@ func TestByzantine(t *testing.T) {
switches := make([]*p2p.Switch, N)
p2pLogger := logger.With("module", "p2p")
for i := 0; i < N; i++ {
switches[i] = p2p.NewSwitch(config.P2P)
switches[i] = p2p.MakeSwitch(
config.P2P,
i,
"foo", "1.0.0",
func(i int, sw *p2p.Switch) *p2p.Switch {
return sw
})
switches[i].SetLogger(p2pLogger.With("validator", i))
}

View File

@ -126,9 +126,12 @@ type Node struct {
privValidator types.PrivValidator // local node's validator key
// network
sw *p2p.Switch // p2p connections
addrBook pex.AddrBook // known peers
nodeKey *p2p.NodeKey // our node privkey
transport *p2p.MultiplexTransport
sw *p2p.Switch // p2p connections
addrBook pex.AddrBook // known peers
nodeInfo p2p.NodeInfo
nodeKey *p2p.NodeKey // our node privkey
isListening bool
// services
eventBus *types.EventBus // pub/sub for services
@ -301,70 +304,6 @@ func NewNode(config *cfg.Config,
consensusReactor := cs.NewConsensusReactor(consensusState, fastSync)
consensusReactor.SetLogger(consensusLogger)
p2pLogger := logger.With("module", "p2p")
sw := p2p.NewSwitch(config.P2P, p2p.WithMetrics(p2pMetrics))
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
// Optionally, start the pex reactor
//
// TODO:
//
// We need to set Seeds and PersistentPeers on the switch,
// since it needs to be able to use these (and their DNS names)
// even if the PEX is off. We can include the DNS name in the NetAddress,
// but it would still be nice to have a clear list of the current "PersistentPeers"
// somewhere that we can return with net_info.
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
if config.P2P.PexReactor {
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewPEXReactor(addrBook,
&pex.PEXReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.P2P.SeedMode,
})
pexReactor.SetLogger(p2pLogger)
sw.AddReactor("PEX", pexReactor)
}
sw.SetAddrBook(addrBook)
// Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer.
// XXX: Query format subject to change
if config.FilterPeers {
// NOTE: addr is ip:port
sw.SetAddrFilter(func(addr net.Addr) error {
resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: fmt.Sprintf("/p2p/filter/addr/%s", addr.String())})
if err != nil {
return err
}
if resQuery.IsErr() {
return fmt.Errorf("Error querying abci app: %v", resQuery)
}
return nil
})
sw.SetIDFilter(func(id p2p.ID) error {
resQuery, err := proxyApp.Query().QuerySync(abci.RequestQuery{Path: fmt.Sprintf("/p2p/filter/id/%s", id)})
if err != nil {
return err
}
if resQuery.IsErr() {
return fmt.Errorf("Error querying abci app: %v", resQuery)
}
return nil
})
}
eventBus := types.NewEventBus()
eventBus.SetLogger(logger.With("module", "events"))
@ -394,6 +333,113 @@ func NewNode(config *cfg.Config,
indexerService := txindex.NewIndexerService(txIndexer, eventBus)
indexerService.SetLogger(logger.With("module", "txindex"))
var (
p2pLogger = logger.With("module", "p2p")
nodeInfo = makeNodeInfo(config, nodeKey.ID(), txIndexer, genDoc.ChainID)
)
// Setup Transport.
var (
transport = p2p.NewMultiplexTransport(nodeInfo, *nodeKey)
connFilters = []p2p.ConnFilterFunc{}
peerFilters = []p2p.PeerFilterFunc{}
)
if !config.P2P.AllowDuplicateIP {
connFilters = append(connFilters, p2p.ConnDuplicateIPFilter())
}
// Filter peers by addr or pubkey with an ABCI query.
// If the query return code is OK, add peer.
// XXX: Query format subject to change
if config.FilterPeers {
connFilters = append(
connFilters,
// ABCI query for address filtering.
func(_ p2p.ConnSet, c net.Conn, _ []net.IP) error {
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/addr/%s", c.RemoteAddr().String()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("Error querying abci app: %v", res)
}
return nil
},
)
peerFilters = append(
peerFilters,
// ABCI query for ID filtering.
func(_ p2p.IPeerSet, p p2p.Peer) error {
res, err := proxyApp.Query().QuerySync(abci.RequestQuery{
Path: fmt.Sprintf("/p2p/filter/id/%s", p.ID()),
})
if err != nil {
return err
}
if res.IsErr() {
return fmt.Errorf("Error querying abci app: %v", res)
}
return nil
},
)
}
p2p.MultiplexTransportConnFilters(connFilters...)(transport)
// Setup Switch.
sw := p2p.NewSwitch(
config.P2P,
transport,
p2p.WithMetrics(p2pMetrics),
p2p.SwitchPeerFilters(peerFilters...),
)
sw.SetLogger(p2pLogger)
sw.AddReactor("MEMPOOL", mempoolReactor)
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
sw.AddReactor("EVIDENCE", evidenceReactor)
sw.SetNodeInfo(nodeInfo)
sw.SetNodeKey(nodeKey)
p2pLogger.Info("P2P Node ID", "ID", nodeKey.ID(), "file", config.NodeKeyFile())
// Optionally, start the pex reactor
//
// TODO:
//
// We need to set Seeds and PersistentPeers on the switch,
// since it needs to be able to use these (and their DNS names)
// even if the PEX is off. We can include the DNS name in the NetAddress,
// but it would still be nice to have a clear list of the current "PersistentPeers"
// somewhere that we can return with net_info.
//
// If PEX is on, it should handle dialing the seeds. Otherwise the switch does it.
// Note we currently use the addrBook regardless at least for AddOurAddress
addrBook := pex.NewAddrBook(config.P2P.AddrBookFile(), config.P2P.AddrBookStrict)
// Add ourselves to addrbook to prevent dialing ourselves
addrBook.AddOurAddress(nodeInfo.NetAddress())
addrBook.SetLogger(p2pLogger.With("book", config.P2P.AddrBookFile()))
if config.P2P.PexReactor {
// TODO persistent peers ? so we can have their DNS addrs saved
pexReactor := pex.NewPEXReactor(addrBook,
&pex.PEXReactorConfig{
Seeds: splitAndTrimEmpty(config.P2P.Seeds, ",", " "),
SeedMode: config.P2P.SeedMode,
})
pexReactor.SetLogger(p2pLogger)
sw.AddReactor("PEX", pexReactor)
}
sw.SetAddrBook(addrBook)
// run the profile server
profileHost := config.ProfListenAddress
if profileHost != "" {
@ -407,9 +453,11 @@ func NewNode(config *cfg.Config,
genesisDoc: genDoc,
privValidator: privValidator,
sw: sw,
addrBook: addrBook,
nodeKey: nodeKey,
transport: transport,
sw: sw,
addrBook: addrBook,
nodeInfo: nodeInfo,
nodeKey: nodeKey,
stateDB: stateDB,
blockStore: blockStore,
@ -441,21 +489,6 @@ func (n *Node) OnStart() error {
return err
}
// Create & add listener
l := p2p.NewDefaultListener(
n.config.P2P.ListenAddress,
n.config.P2P.ExternalAddress,
n.config.P2P.UPNP,
n.Logger.With("module", "p2p"))
n.sw.AddListener(l)
nodeInfo := n.makeNodeInfo(n.nodeKey.ID())
n.sw.SetNodeInfo(nodeInfo)
n.sw.SetNodeKey(n.nodeKey)
// Add ourselves to addrbook to prevent dialing ourselves
n.addrBook.AddOurAddress(nodeInfo.NetAddress())
// Add private IDs to addrbook to block those peers being added
n.addrBook.AddPrivateIDs(splitAndTrimEmpty(n.config.P2P.PrivatePeerIDs, ",", " "))
@ -474,6 +507,17 @@ func (n *Node) OnStart() error {
n.prometheusSrv = n.startPrometheusServer(n.config.Instrumentation.PrometheusListenAddr)
}
// Start the transport.
addr, err := p2p.NewNetAddressStringWithOptionalID(n.config.P2P.ListenAddress)
if err != nil {
return err
}
if err := n.transport.Listen(*addr); err != nil {
return err
}
n.isListening = true
// Start the switch (the P2P server).
err = n.sw.Start()
if err != nil {
@ -506,6 +550,12 @@ func (n *Node) OnStop() {
// TODO: gracefully disconnect from peers.
n.sw.Stop()
if err := n.transport.Close(); err != nil {
n.Logger.Error("Error closing transport", "err", err)
}
n.isListening = false
// finally stop the listeners / external services
for _, l := range n.rpcListeners {
n.Logger.Info("Closing rpc listener", "listener", l)
@ -536,13 +586,6 @@ func (n *Node) RunForever() {
})
}
// AddListener adds a listener to accept inbound peer connections.
// It should be called before starting the Node.
// The first listener is the primary listener (in NodeInfo)
func (n *Node) AddListener(l p2p.Listener) {
n.sw.AddListener(l)
}
// ConfigureRPC sets all variables in rpccore so they will serve
// rpc calls from this node
func (n *Node) ConfigureRPC() {
@ -551,7 +594,8 @@ func (n *Node) ConfigureRPC() {
rpccore.SetConsensusState(n.consensusState)
rpccore.SetMempool(n.mempoolReactor.Mempool)
rpccore.SetEvidencePool(n.evidencePool)
rpccore.SetSwitch(n.sw)
rpccore.SetP2PPeers(n.sw)
rpccore.SetP2PTransport(n)
rpccore.SetPubKey(n.privValidator.GetPubKey())
rpccore.SetGenesisDoc(n.genesisDoc)
rpccore.SetAddrBook(n.addrBook)
@ -683,14 +727,36 @@ func (n *Node) ProxyApp() proxy.AppConns {
return n.proxyApp
}
func (n *Node) makeNodeInfo(nodeID p2p.ID) p2p.NodeInfo {
//------------------------------------------------------------------------------
func (n *Node) Listeners() []string {
return []string{
fmt.Sprintf("Listener(@%v)", n.config.P2P.ExternalAddress),
}
}
func (n *Node) IsListening() bool {
return n.isListening
}
// NodeInfo returns the Node's Info from the Switch.
func (n *Node) NodeInfo() p2p.NodeInfo {
return n.nodeInfo
}
func makeNodeInfo(
config *cfg.Config,
nodeID p2p.ID,
txIndexer txindex.TxIndexer,
chainID string,
) p2p.NodeInfo {
txIndexerStatus := "on"
if _, ok := n.txIndexer.(*null.TxIndex); ok {
if _, ok := txIndexer.(*null.TxIndex); ok {
txIndexerStatus = "off"
}
nodeInfo := p2p.NodeInfo{
ID: nodeID,
Network: n.genesisDoc.ChainID,
Network: chainID,
Version: version.Version,
Channels: []byte{
bc.BlockchainChannel,
@ -698,7 +764,7 @@ func (n *Node) makeNodeInfo(nodeID p2p.ID) p2p.NodeInfo {
mempl.MempoolChannel,
evidence.EvidenceChannel,
},
Moniker: n.config.Moniker,
Moniker: config.Moniker,
Other: p2p.NodeInfoOther{
AminoVersion: amino.Version,
P2PVersion: p2p.Version,
@ -708,34 +774,26 @@ func (n *Node) makeNodeInfo(nodeID p2p.ID) p2p.NodeInfo {
},
}
if n.config.P2P.PexReactor {
if config.P2P.PexReactor {
nodeInfo.Channels = append(nodeInfo.Channels, pex.PexChannel)
}
rpcListenAddr := n.config.RPC.ListenAddress
rpcListenAddr := config.RPC.ListenAddress
nodeInfo.Other.RPCAddress = rpcListenAddr
if !n.sw.IsListening() {
return nodeInfo
lAddr := config.P2P.ExternalAddress
if lAddr == "" {
lAddr = config.P2P.ListenAddress
}
p2pListener := n.sw.Listeners()[0]
p2pHost := p2pListener.ExternalAddressHost()
p2pPort := p2pListener.ExternalAddress().Port
nodeInfo.ListenAddr = fmt.Sprintf("%v:%v", p2pHost, p2pPort)
nodeInfo.ListenAddr = lAddr
return nodeInfo
}
//------------------------------------------------------------------------------
// NodeInfo returns the Node's Info from the Switch.
func (n *Node) NodeInfo() p2p.NodeInfo {
return n.sw.NodeInfo()
}
//------------------------------------------------------------------------------
var (
genesisDocKey = []byte("genesisDoc")
)

View File

@ -5,6 +5,98 @@ import (
"net"
)
// ErrFilterTimeout indicates that a filter operation timed out.
type ErrFilterTimeout struct{}
func (e ErrFilterTimeout) Error() string {
return "filter timed out"
}
// ErrRejected indicates that a Peer was rejected carrying additional
// information as to the reason.
type ErrRejected struct {
addr NetAddress
conn net.Conn
err error
id ID
isAuthFailure bool
isDuplicate bool
isFiltered bool
isIncompatible bool
isNodeInfoInvalid bool
isSelf bool
}
// Addr returns the NetAddress for the rejected Peer.
func (e ErrRejected) Addr() NetAddress {
return e.addr
}
func (e ErrRejected) Error() string {
if e.isAuthFailure {
return fmt.Sprintf("auth failure: %s", e.err)
}
if e.isDuplicate {
if e.conn != nil {
return fmt.Sprintf(
"duplicate CONN<%s>: %s",
e.conn.RemoteAddr().String(),
e.err,
)
}
if e.id != "" {
return fmt.Sprintf("duplicate ID<%v>: %s", e.id, e.err)
}
}
if e.isFiltered {
if e.conn != nil {
return fmt.Sprintf(
"filtered CONN<%s>: %s",
e.conn.RemoteAddr().String(),
e.err,
)
}
if e.id != "" {
return fmt.Sprintf("filtered ID<%v>: %s", e.id, e.err)
}
}
if e.isIncompatible {
return fmt.Sprintf("incompatible: %s", e.err)
}
if e.isNodeInfoInvalid {
return fmt.Sprintf("invalid NodeInfo: %s", e.err)
}
if e.isSelf {
return fmt.Sprintf("self ID<%v>", e.id)
}
return fmt.Sprintf("%s", e.err)
}
// IsAuthFailure when Peer authentication was unsuccessful.
func (e ErrRejected) IsAuthFailure() bool { return e.isAuthFailure }
// IsDuplicate when Peer ID or IP are present already.
func (e ErrRejected) IsDuplicate() bool { return e.isDuplicate }
// IsFiltered when Peer ID or IP was filtered.
func (e ErrRejected) IsFiltered() bool { return e.isFiltered }
// IsIncompatible when Peer NodeInfo is not compatible with our own.
func (e ErrRejected) IsIncompatible() bool { return e.isIncompatible }
// IsNodeInfoInvalid when the sent NodeInfo is not valid.
func (e ErrRejected) IsNodeInfoInvalid() bool { return e.isNodeInfoInvalid }
// IsSelf when Peer is our own node.
func (e ErrRejected) IsSelf() bool { return e.isSelf }
// ErrSwitchDuplicatePeerID to be raised when a peer is connecting with a known
// ID.
type ErrSwitchDuplicatePeerID struct {
@ -47,6 +139,13 @@ func (e ErrSwitchAuthenticationFailure) Error() string {
)
}
// ErrTransportClosed is raised when the Transport has been closed.
type ErrTransportClosed struct{}
func (e ErrTransportClosed) Error() string {
return "transport has been closed"
}
//-------------------------------------------------------------------
type ErrNetAddressNoID struct {

View File

@ -1,286 +0,0 @@
package p2p
import (
"fmt"
"net"
"strconv"
"strings"
"time"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/p2p/upnp"
)
// Listener is a network listener for stream-oriented protocols, providing
// convenient methods to get listener's internal and external addresses.
// Clients are supposed to read incoming connections from a channel, returned
// by Connections() method.
type Listener interface {
Connections() <-chan net.Conn
InternalAddress() *NetAddress
ExternalAddress() *NetAddress
ExternalAddressHost() string
String() string
Stop() error
}
// DefaultListener is a cmn.Service, running net.Listener underneath.
// Optionally, UPnP is used upon calling NewDefaultListener to resolve external
// address.
type DefaultListener struct {
cmn.BaseService
listener net.Listener
intAddr *NetAddress
extAddr *NetAddress
connections chan net.Conn
}
var _ Listener = (*DefaultListener)(nil)
const (
numBufferedConnections = 10
defaultExternalPort = 8770
tryListenSeconds = 5
)
func splitHostPort(addr string) (host string, port int) {
host, portStr, err := net.SplitHostPort(addr)
if err != nil {
panic(err)
}
port, err = strconv.Atoi(portStr)
if err != nil {
panic(err)
}
return host, port
}
// NewDefaultListener creates a new DefaultListener on lAddr, optionally trying
// to determine external address using UPnP.
func NewDefaultListener(
fullListenAddrString string,
externalAddrString string,
useUPnP bool,
logger log.Logger) Listener {
// Split protocol, address, and port.
protocol, lAddr := cmn.ProtocolAndAddress(fullListenAddrString)
lAddrIP, lAddrPort := splitHostPort(lAddr)
// Create listener
var listener net.Listener
var err error
for i := 0; i < tryListenSeconds; i++ {
listener, err = net.Listen(protocol, lAddr)
if err == nil {
break
} else if i < tryListenSeconds-1 {
time.Sleep(time.Second * 1)
}
}
if err != nil {
panic(err)
}
// Actual listener local IP & port
listenerIP, listenerPort := splitHostPort(listener.Addr().String())
logger.Info("Local listener", "ip", listenerIP, "port", listenerPort)
// Determine internal address...
var intAddr *NetAddress
intAddr, err = NewNetAddressStringWithOptionalID(lAddr)
if err != nil {
panic(err)
}
inAddrAny := lAddrIP == "" || lAddrIP == "0.0.0.0"
// Determine external address.
var extAddr *NetAddress
if externalAddrString != "" {
var err error
extAddr, err = NewNetAddressStringWithOptionalID(externalAddrString)
if err != nil {
panic(fmt.Sprintf("Error in ExternalAddress: %v", err))
}
}
// If the lAddrIP is INADDR_ANY, try UPnP.
if extAddr == nil && useUPnP && inAddrAny {
extAddr = getUPNPExternalAddress(lAddrPort, listenerPort, logger)
}
// Otherwise just use the local address.
if extAddr == nil {
defaultToIPv4 := inAddrAny
extAddr = getNaiveExternalAddress(defaultToIPv4, listenerPort, false, logger)
}
if extAddr == nil {
panic("Could not determine external address!")
}
dl := &DefaultListener{
listener: listener,
intAddr: intAddr,
extAddr: extAddr,
connections: make(chan net.Conn, numBufferedConnections),
}
dl.BaseService = *cmn.NewBaseService(logger, "DefaultListener", dl)
err = dl.Start() // Started upon construction
if err != nil {
logger.Error("Error starting base service", "err", err)
}
return dl
}
// OnStart implements cmn.Service by spinning a goroutine, listening for new
// connections.
func (l *DefaultListener) OnStart() error {
if err := l.BaseService.OnStart(); err != nil {
return err
}
go l.listenRoutine()
return nil
}
// OnStop implements cmn.Service by closing the listener.
func (l *DefaultListener) OnStop() {
l.BaseService.OnStop()
l.listener.Close() // nolint: errcheck
}
// Accept connections and pass on the channel.
func (l *DefaultListener) listenRoutine() {
for {
conn, err := l.listener.Accept()
if !l.IsRunning() {
break // Go to cleanup
}
// listener wasn't stopped,
// yet we encountered an error.
if err != nil {
panic(err)
}
l.connections <- conn
}
// Cleanup
close(l.connections)
for range l.connections {
// Drain
}
}
// Connections returns a channel of inbound connections.
// It gets closed when the listener closes.
// It is the callers responsibility to close any connections received
// over this channel.
func (l *DefaultListener) Connections() <-chan net.Conn {
return l.connections
}
// InternalAddress returns the internal NetAddress (address used for
// listening).
func (l *DefaultListener) InternalAddress() *NetAddress {
return l.intAddr
}
// ExternalAddress returns the external NetAddress (publicly available,
// determined using either UPnP or local resolver).
func (l *DefaultListener) ExternalAddress() *NetAddress {
return l.extAddr
}
// ExternalAddressHost returns the external NetAddress IP string. If an IP is
// IPv6, it's wrapped in brackets ("[2001:db8:1f70::999:de8:7648:6e8]").
func (l *DefaultListener) ExternalAddressHost() string {
ip := l.ExternalAddress().IP
if isIpv6(ip) {
// Means it's ipv6, so format it with brackets
return "[" + ip.String() + "]"
}
return ip.String()
}
func (l *DefaultListener) String() string {
return fmt.Sprintf("Listener(@%v)", l.extAddr)
}
/* external address helpers */
// UPNP external address discovery & port mapping
func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *NetAddress {
logger.Info("Getting UPNP external address")
nat, err := upnp.Discover()
if err != nil {
logger.Info("Could not perform UPNP discover", "err", err)
return nil
}
ext, err := nat.GetExternalAddress()
if err != nil {
logger.Info("Could not get UPNP external address", "err", err)
return nil
}
// UPnP can't seem to get the external port, so let's just be explicit.
if externalPort == 0 {
externalPort = defaultExternalPort
}
externalPort, err = nat.AddPortMapping("tcp", externalPort, internalPort, "tendermint", 0)
if err != nil {
logger.Info("Could not add UPNP port mapping", "err", err)
return nil
}
logger.Info("Got UPNP external address", "address", ext)
return NewNetAddressIPPort(ext, uint16(externalPort))
}
func isIpv6(ip net.IP) bool {
v4 := ip.To4()
if v4 != nil {
return false
}
ipString := ip.String()
// Extra check just to be sure it's IPv6
return (strings.Contains(ipString, ":") && !strings.Contains(ipString, "."))
}
// TODO: use syscalls: see issue #712
func getNaiveExternalAddress(defaultToIPv4 bool, port int, settleForLocal bool, logger log.Logger) *NetAddress {
addrs, err := net.InterfaceAddrs()
if err != nil {
panic(fmt.Sprintf("Could not fetch interface addresses: %v", err))
}
for _, a := range addrs {
ipnet, ok := a.(*net.IPNet)
if !ok {
continue
}
if defaultToIPv4 || !isIpv6(ipnet.IP) {
v4 := ipnet.IP.To4()
if v4 == nil || (!settleForLocal && v4[0] == 127) {
// loopback
continue
}
} else if !settleForLocal && ipnet.IP.IsLoopback() {
// IPv6, check for loopback
continue
}
return NewNetAddressIPPort(ipnet.IP, uint16(port))
}
// try again, but settle for local
logger.Info("Node may not be connected to internet. Settling for local address")
return getNaiveExternalAddress(defaultToIPv4, port, true, logger)
}

View File

@ -1,79 +0,0 @@
package p2p
import (
"bytes"
"net"
"strings"
"testing"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/log"
)
func TestListener(t *testing.T) {
// Create a listener
l := NewDefaultListener("tcp://:8001", "", false, log.TestingLogger())
// Dial the listener
lAddr := l.ExternalAddress()
connOut, err := lAddr.Dial()
if err != nil {
t.Fatalf("Could not connect to listener address %v", lAddr)
} else {
t.Logf("Created a connection to listener address %v", lAddr)
}
connIn, ok := <-l.Connections()
if !ok {
t.Fatalf("Could not get inbound connection from listener")
}
msg := []byte("hi!")
go func() {
_, err := connIn.Write(msg)
if err != nil {
t.Error(err)
}
}()
b := make([]byte, 32)
n, err := connOut.Read(b)
if err != nil {
t.Fatalf("Error reading off connection: %v", err)
}
b = b[:n]
if !bytes.Equal(msg, b) {
t.Fatalf("Got %s, expected %s", b, msg)
}
// Close the server, no longer needed.
l.Stop()
}
func TestExternalAddress(t *testing.T) {
{
// Create a listener with no external addr. Should default
// to local ipv4.
l := NewDefaultListener("tcp://:8001", "", false, log.TestingLogger())
lAddr := l.ExternalAddress().String()
_, _, err := net.SplitHostPort(lAddr)
require.Nil(t, err)
spl := strings.Split(lAddr, ".")
require.Equal(t, len(spl), 4)
l.Stop()
}
{
// Create a listener with set external ipv4 addr.
setExAddr := "8.8.8.8:8080"
l := NewDefaultListener("tcp://:8001", setExAddr, false, log.TestingLogger())
lAddr := l.ExternalAddress().String()
require.Equal(t, lAddr, setExAddr)
l.Stop()
}
{
// Invalid external addr causes panic
setExAddr := "awrlsckjnal:8080"
require.Panics(t, func() { NewDefaultListener("tcp://:8001", setExAddr, false, log.TestingLogger()) })
}
}

View File

@ -6,7 +6,6 @@ import (
"sync/atomic"
"time"
crypto "github.com/tendermint/tendermint/crypto"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
@ -130,87 +129,6 @@ func newPeer(
return p
}
func newOutboundPeerConn(
addr *NetAddress,
config *config.P2PConfig,
persistent bool,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
conn, err := dial(addr, config)
if err != nil {
return peerConn{}, cmn.ErrorWrap(err, "Error creating peer")
}
pc, err := newPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
if err != nil {
if cerr := conn.Close(); cerr != nil {
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
}
return peerConn{}, err
}
// ensure dialed ID matches connection ID
if addr.ID != pc.ID() {
if cerr := conn.Close(); cerr != nil {
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
}
return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()}
}
return pc, nil
}
func newInboundPeerConn(
conn net.Conn,
config *config.P2PConfig,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
// TODO: issue PoW challenge
return newPeerConn(conn, config, false, false, ourNodePrivKey, nil)
}
func newPeerConn(
rawConn net.Conn,
cfg *config.P2PConfig,
outbound, persistent bool,
ourNodePrivKey crypto.PrivKey,
originalAddr *NetAddress,
) (pc peerConn, err error) {
conn := rawConn
// Fuzz connection
if cfg.TestFuzz {
// so we have time to do peer handshakes and get set up
conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig)
}
// Set deadline for secret handshake
dl := time.Now().Add(cfg.HandshakeTimeout)
if err := conn.SetDeadline(dl); err != nil {
return pc, cmn.ErrorWrap(
err,
"Error setting deadline while encrypting connection",
)
}
// Encrypt connection
conn, err = tmconn.MakeSecretConnection(conn, ourNodePrivKey)
if err != nil {
return pc, cmn.ErrorWrap(err, "Error creating peer")
}
// Only the information we already have
return peerConn{
config: cfg,
outbound: outbound,
persistent: persistent,
conn: conn,
originalAddr: originalAddr,
}, nil
}
//---------------------------------------------------
// Implements cmn.Service
@ -399,18 +317,6 @@ func (p *peer) String() string {
//------------------------------------------------------------------
// helper funcs
func dial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) {
if cfg.TestDialFail {
return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)")
}
conn, err := addr.DialTimeout(cfg.DialTimeout)
if err != nil {
return nil, err
}
return conn, nil
}
func createMConnection(
conn net.Conn,
p *peer,

View File

@ -1,6 +1,7 @@
package p2p
import (
"fmt"
golog "log"
"net"
"testing"
@ -76,7 +77,7 @@ func createOutboundPeerAndPerformHandshake(
}
reactorsByCh := map[byte]Reactor{testCh: NewTestReactor(chDescs, true)}
pk := ed25519.GenPrivKey()
pc, err := newOutboundPeerConn(addr, config, false, pk)
pc, err := testOutboundPeerConn(addr, config, false, pk)
if err != nil {
return nil, err
}
@ -96,6 +97,48 @@ func createOutboundPeerAndPerformHandshake(
return p, nil
}
func testDial(addr *NetAddress, cfg *config.P2PConfig) (net.Conn, error) {
if cfg.TestDialFail {
return nil, fmt.Errorf("dial err (peerConfig.DialFail == true)")
}
conn, err := addr.DialTimeout(cfg.DialTimeout)
if err != nil {
return nil, err
}
return conn, nil
}
func testOutboundPeerConn(
addr *NetAddress,
config *config.P2PConfig,
persistent bool,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
conn, err := testDial(addr, config)
if err != nil {
return peerConn{}, cmn.ErrorWrap(err, "Error creating peer")
}
pc, err := testPeerConn(conn, config, true, persistent, ourNodePrivKey, addr)
if err != nil {
if cerr := conn.Close(); cerr != nil {
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
}
return peerConn{}, err
}
// ensure dialed ID matches connection ID
if addr.ID != pc.ID() {
if cerr := conn.Close(); cerr != nil {
return peerConn{}, cmn.ErrorWrap(err, cerr.Error())
}
return peerConn{}, ErrSwitchAuthenticationFailure{addr, pc.ID()}
}
return pc, nil
}
type remotePeer struct {
PrivKey crypto.PrivKey
Config *config.P2PConfig
@ -143,19 +186,19 @@ func (rp *remotePeer) accept(l net.Listener) {
golog.Fatalf("Failed to accept conn: %+v", err)
}
pc, err := newInboundPeerConn(conn, rp.Config, rp.PrivKey)
pc, err := testInboundPeerConn(conn, rp.Config, rp.PrivKey)
if err != nil {
golog.Fatalf("Failed to create a peer: %+v", err)
}
_, err = pc.HandshakeTimeout(NodeInfo{
_, err = handshake(pc.conn, time.Second, NodeInfo{
ID: rp.Addr().ID,
Moniker: "remote_peer",
Network: "testing",
Version: "123.123.123",
ListenAddr: l.Addr().String(),
Channels: rp.channels,
}, 1*time.Second)
})
if err != nil {
golog.Fatalf("Failed to perform handshake: %+v", err)
}

View File

@ -109,9 +109,7 @@ func TestPEXReactorRunning(t *testing.T) {
addOtherNodeAddrToAddrBook(1, 0)
addOtherNodeAddrToAddrBook(2, 1)
for i, sw := range switches {
sw.AddListener(p2p.NewDefaultListener("tcp://"+sw.NodeInfo().ListenAddr, "", false, logger.With("pex", i)))
for _, sw := range switches {
err := sw.Start() // start switch and reactors
require.Nil(t, err)
}
@ -474,9 +472,6 @@ func testCreatePeerWithConfig(dir string, id int, config *PEXReactorConfig) *p2p
return sw
},
)
peer.AddListener(
p2p.NewDefaultListener("tcp://"+peer.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
)
return peer
}
@ -510,9 +505,6 @@ func testCreateSeed(dir string, id int, knownAddrs, srcAddrs []*p2p.NetAddress)
return sw
},
)
seed.AddListener(
p2p.NewDefaultListener("tcp://"+seed.NodeInfo().ListenAddr, "", false, log.TestingLogger()),
)
return seed
}

View File

@ -3,7 +3,6 @@ package p2p
import (
"fmt"
"math"
"net"
"sync"
"time"
@ -42,6 +41,10 @@ type AddrBook interface {
Save()
}
// PeerFilterFunc to be implemented by filter hooks after a new Peer has been
// fully setup.
type PeerFilterFunc func(IPeerSet, Peer) error
//-----------------------------------------------------------------------------
// Switch handles peer connections and exposes an API to receive incoming messages
@ -52,7 +55,6 @@ type Switch struct {
cmn.BaseService
config *config.P2PConfig
listeners []Listener
reactors map[string]Reactor
chDescs []*conn.ChannelDescriptor
reactorsByCh map[byte]Reactor
@ -63,8 +65,10 @@ type Switch struct {
nodeKey *NodeKey // our node privkey
addrBook AddrBook
filterConnByAddr func(net.Addr) error
filterConnByID func(ID) error
transport Transport
filterTimeout time.Duration
peerFilters []PeerFilterFunc
mConfig conn.MConnConfig
@ -77,16 +81,22 @@ type Switch struct {
type SwitchOption func(*Switch)
// NewSwitch creates a new Switch with the given config.
func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch {
func NewSwitch(
cfg *config.P2PConfig,
transport Transport,
options ...SwitchOption,
) *Switch {
sw := &Switch{
config: cfg,
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
reconnecting: cmn.NewCMap(),
metrics: NopMetrics(),
config: cfg,
reactors: make(map[string]Reactor),
chDescs: make([]*conn.ChannelDescriptor, 0),
reactorsByCh: make(map[byte]Reactor),
peers: NewPeerSet(),
dialing: cmn.NewCMap(),
reconnecting: cmn.NewCMap(),
metrics: NopMetrics(),
transport: transport,
filterTimeout: defaultFilterTimeout,
}
// Ensure we have a completely undeterministic PRNG.
@ -109,6 +119,16 @@ func NewSwitch(cfg *config.P2PConfig, options ...SwitchOption) *Switch {
return sw
}
// SwitchFilterTimeout sets the timeout used for peer filters.
func SwitchFilterTimeout(timeout time.Duration) SwitchOption {
return func(sw *Switch) { sw.filterTimeout = timeout }
}
// SwitchPeerFilters sets the filters for rejection of new peers.
func SwitchPeerFilters(filters ...PeerFilterFunc) SwitchOption {
return func(sw *Switch) { sw.peerFilters = filters }
}
// WithMetrics sets the metrics.
func WithMetrics(metrics *Metrics) SwitchOption {
return func(sw *Switch) { sw.metrics = metrics }
@ -148,24 +168,6 @@ func (sw *Switch) Reactor(name string) Reactor {
return sw.reactors[name]
}
// AddListener adds the given listener to the switch for listening to incoming peer connections.
// NOTE: Not goroutine safe.
func (sw *Switch) AddListener(l Listener) {
sw.listeners = append(sw.listeners, l)
}
// Listeners returns the list of listeners the switch listens on.
// NOTE: Not goroutine safe.
func (sw *Switch) Listeners() []Listener {
return sw.listeners
}
// IsListening returns true if the switch has at least one listener.
// NOTE: Not goroutine safe.
func (sw *Switch) IsListening() bool {
return len(sw.listeners) > 0
}
// SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes.
// NOTE: Not goroutine safe.
func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) {
@ -187,7 +189,7 @@ func (sw *Switch) SetNodeKey(nodeKey *NodeKey) {
//---------------------------------------------------------------------
// Service start/stop
// OnStart implements BaseService. It starts all the reactors, peers, and listeners.
// OnStart implements BaseService. It starts all the reactors and peers.
func (sw *Switch) OnStart() error {
// Start reactors
for _, reactor := range sw.reactors {
@ -196,25 +198,21 @@ func (sw *Switch) OnStart() error {
return cmn.ErrorWrap(err, "failed to start %v", reactor)
}
}
// Start listeners
for _, listener := range sw.listeners {
go sw.listenerRoutine(listener)
}
// Start accepting Peers.
go sw.acceptRoutine()
return nil
}
// OnStop implements BaseService. It stops all listeners, peers, and reactors.
// OnStop implements BaseService. It stops all peers and reactors.
func (sw *Switch) OnStop() {
// Stop listeners
for _, listener := range sw.listeners {
listener.Stop()
}
sw.listeners = nil
// Stop peers
for _, peer := range sw.peers.List() {
peer.Stop()
sw.peers.Remove(peer)
for _, p := range sw.peers.List() {
p.Stop()
sw.peers.Remove(p)
}
// Stop reactors
sw.Logger.Debug("Switch: Stopping reactors")
for _, reactor := range sw.reactors {
@ -459,42 +457,46 @@ func (sw *Switch) IsDialingOrExistingAddress(addr *NetAddress) bool {
(!sw.config.AllowDuplicateIP && sw.peers.HasIP(addr.IP))
}
//------------------------------------------------------------------------------------
// Connection filtering
// FilterConnByAddr returns an error if connecting to the given address is forbidden.
func (sw *Switch) FilterConnByAddr(addr net.Addr) error {
if sw.filterConnByAddr != nil {
return sw.filterConnByAddr(addr)
}
return nil
}
// FilterConnByID returns an error if connecting to the given peer ID is forbidden.
func (sw *Switch) FilterConnByID(id ID) error {
if sw.filterConnByID != nil {
return sw.filterConnByID(id)
}
return nil
}
// SetAddrFilter sets the function for filtering connections by address.
func (sw *Switch) SetAddrFilter(f func(net.Addr) error) {
sw.filterConnByAddr = f
}
// SetIDFilter sets the function for filtering connections by peer ID.
func (sw *Switch) SetIDFilter(f func(ID) error) {
sw.filterConnByID = f
}
//------------------------------------------------------------------------------------
func (sw *Switch) listenerRoutine(l Listener) {
func (sw *Switch) acceptRoutine() {
for {
inConn, ok := <-l.Connections()
if !ok {
p, err := sw.transport.Accept(peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
switch err.(type) {
case ErrRejected:
rErr := err.(ErrRejected)
if rErr.IsSelf() {
// Remove the given address from the address book and add to our addresses
// to avoid dialing in the future.
addr := rErr.Addr()
sw.addrBook.RemoveAddress(&addr)
sw.addrBook.AddOurAddress(&addr)
}
sw.Logger.Info(
"Inbound Peer rejected",
"err", err,
"numPeers", sw.peers.Size(),
)
continue
case *ErrTransportClosed:
sw.Logger.Error(
"Stopped accept routine, as transport is closed",
"numPeers", sw.peers.Size(),
)
default:
sw.Logger.Error(
"Accept on transport errored",
"err", err,
"numPeers", sw.peers.Size(),
)
}
break
}
@ -503,41 +505,25 @@ func (sw *Switch) listenerRoutine(l Listener) {
if in >= sw.config.MaxNumInboundPeers {
sw.Logger.Info(
"Ignoring inbound connection: already have enough inbound peers",
"address", inConn.RemoteAddr().String(),
"address", p.NodeInfo().NetAddress().String(),
"have", in,
"max", sw.config.MaxNumInboundPeers,
)
inConn.Close()
_ = p.Stop()
continue
}
// New inbound connection!
err := sw.addInboundPeerWithConfig(inConn, sw.config)
if err != nil {
sw.Logger.Info("Ignoring inbound connection: error while adding peer", "address", inConn.RemoteAddr().String(), "err", err)
continue
if err := sw.addPeer(p); err != nil {
_ = p.Stop()
sw.Logger.Info(
"Ignoring inbound connection: error while adding peer",
"err", err,
"id", p.ID(),
)
}
}
// cleanup
}
// closes conn if err is returned
func (sw *Switch) addInboundPeerWithConfig(
conn net.Conn,
config *config.P2PConfig,
) error {
peerConn, err := newInboundPeerConn(conn, config, sw.nodeKey.PrivKey)
if err != nil {
conn.Close() // peer is nil
return err
}
if err = sw.addPeer(peerConn); err != nil {
peerConn.CloseConn()
return err
}
return nil
}
// dial the peer; make secret connection; authenticate against the dialed ID;
@ -547,109 +533,88 @@ func (sw *Switch) addInboundPeerWithConfig(
// StopPeerForError is called
func (sw *Switch) addOutboundPeerWithConfig(
addr *NetAddress,
config *config.P2PConfig,
cfg *config.P2PConfig,
persistent bool,
) error {
sw.Logger.Info("Dialing peer", "address", addr)
peerConn, err := newOutboundPeerConn(
addr,
config,
persistent,
sw.nodeKey.PrivKey,
)
// XXX(xla): Remove the leakage of test concerns in implementation.
if cfg.TestDialFail {
go sw.reconnectToPeer(addr)
return fmt.Errorf("dial err (peerConfig.DialFail == true)")
}
p, err := sw.transport.Dial(*addr, peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
persistent: persistent,
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
switch e := err.(type) {
case ErrRejected:
if e.IsSelf() {
// Remove the given address from the address book and add to our addresses
// to avoid dialing in the future.
sw.addrBook.RemoveAddress(addr)
sw.addrBook.AddOurAddress(addr)
}
}
if persistent {
go sw.reconnectToPeer(addr)
}
return err
}
if err := sw.addPeer(peerConn); err != nil {
peerConn.CloseConn()
if err := sw.addPeer(p); err != nil {
_ = p.Stop()
return err
}
return nil
}
// addPeer performs the Tendermint P2P handshake with a peer
// that already has a SecretConnection. If all goes well,
// it starts the peer and adds it to the switch.
// NOTE: This performs a blocking handshake before the peer is added.
// NOTE: If error is returned, caller is responsible for calling
// peer.CloseConn()
func (sw *Switch) addPeer(pc peerConn) error {
addr := pc.conn.RemoteAddr()
if err := sw.FilterConnByAddr(addr); err != nil {
return err
}
// Exchange NodeInfo on the conn
peerNodeInfo, err := pc.HandshakeTimeout(sw.nodeInfo, time.Duration(sw.config.HandshakeTimeout))
if err != nil {
return err
}
peerID := peerNodeInfo.ID
// ensure connection key matches self reported key
connID := pc.ID()
if peerID != connID {
return fmt.Errorf(
"nodeInfo.ID() (%v) doesn't match conn.ID() (%v)",
peerID,
connID,
)
}
// Validate the peers nodeInfo
if err := peerNodeInfo.Validate(); err != nil {
return err
}
// Avoid self
if sw.nodeKey.ID() == peerID {
addr := peerNodeInfo.NetAddress()
// remove the given address from the address book
// and add to our addresses to avoid dialing again
if sw.addrBook != nil {
sw.addrBook.RemoveAddress(addr)
sw.addrBook.AddOurAddress(addr)
}
return ErrSwitchConnectToSelf{addr}
}
func (sw *Switch) filterPeer(p Peer) error {
// Avoid duplicate
if sw.peers.Has(peerID) {
return ErrSwitchDuplicatePeerID{peerID}
if sw.peers.Has(p.ID()) {
return ErrRejected{id: p.ID(), isDuplicate: true}
}
// Check for duplicate connection or peer info IP.
if !sw.config.AllowDuplicateIP &&
(sw.peers.HasIP(pc.RemoteIP()) ||
sw.peers.HasIP(peerNodeInfo.NetAddress().IP)) {
return ErrSwitchDuplicatePeerIP{pc.RemoteIP()}
errc := make(chan error, len(sw.peerFilters))
for _, f := range sw.peerFilters {
go func(f PeerFilterFunc, p Peer, errc chan<- error) {
errc <- f(sw.peers, p)
}(f, p, errc)
}
// Filter peer against ID white list
if err := sw.FilterConnByID(peerID); err != nil {
for i := 0; i < cap(errc); i++ {
select {
case err := <-errc:
if err != nil {
return ErrRejected{id: p.ID(), err: err, isFiltered: true}
}
case <-time.After(sw.filterTimeout):
return ErrFilterTimeout{}
}
}
return nil
}
// addPeer starts up the Peer and adds it to the Switch.
func (sw *Switch) addPeer(p Peer) error {
if err := sw.filterPeer(p); err != nil {
return err
}
// Check version, chain id
if err := sw.nodeInfo.CompatibleWith(peerNodeInfo); err != nil {
return err
}
peer := newPeer(pc, sw.mConfig, peerNodeInfo, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError)
peer.SetLogger(sw.Logger.With("peer", addr))
peer.Logger.Info("Successful handshake with peer", "peerNodeInfo", peerNodeInfo)
p.SetLogger(sw.Logger.With("peer", p.NodeInfo().NetAddress().String))
// All good. Start peer
if sw.IsRunning() {
if err = sw.startInitPeer(peer); err != nil {
if err := sw.startInitPeer(p); err != nil {
return err
}
}
@ -657,25 +622,30 @@ func (sw *Switch) addPeer(pc peerConn) error {
// Add the peer to .peers.
// We start it first so that a peer in the list is safe to Stop.
// It should not err since we already checked peers.Has().
if err := sw.peers.Add(peer); err != nil {
if err := sw.peers.Add(p); err != nil {
return err
}
sw.Logger.Info("Added peer", "peer", p)
sw.metrics.Peers.Add(float64(1))
sw.Logger.Info("Added peer", "peer", peer)
return nil
}
func (sw *Switch) startInitPeer(peer *peer) error {
err := peer.Start() // spawn send/recv routines
func (sw *Switch) startInitPeer(p Peer) error {
err := p.Start() // spawn send/recv routines
if err != nil {
// Should never happen
sw.Logger.Error("Error starting peer", "peer", peer, "err", err)
sw.Logger.Error(
"Error starting peer",
"err", err,
"peer", p,
)
return err
}
for _, reactor := range sw.reactors {
reactor.AddPeer(peer)
reactor.AddPeer(p)
}
return nil

View File

@ -3,7 +3,6 @@ package p2p
import (
"bytes"
"fmt"
"net"
"sync"
"testing"
"time"
@ -11,10 +10,9 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/crypto/ed25519"
"github.com/tendermint/tendermint/libs/log"
"github.com/tendermint/tendermint/config"
"github.com/tendermint/tendermint/p2p/conn"
)
@ -151,35 +149,6 @@ func assertMsgReceivedWithTimeout(t *testing.T, msgBytes []byte, channel byte, r
}
}
func TestConnAddrFilter(t *testing.T) {
s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := conn.NetPipe()
s1.SetAddrFilter(func(addr net.Addr) error {
if addr.String() == c1.RemoteAddr().String() {
return fmt.Errorf("Error: pipe is blacklisted")
}
return nil
})
// connect to good peer
go func() {
err := s1.addPeerWithConnection(c1)
assert.NotNil(t, err, "expected err")
}()
go func() {
err := s2.addPeerWithConnection(c2)
assert.NotNil(t, err, "expected err")
}()
assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
}
func TestSwitchFiltersOutItself(t *testing.T) {
s1 := MakeSwitch(cfg, 1, "127.0.0.1", "123.123.123", initSwitchFunc)
// addr := s1.NodeInfo().NetAddress()
@ -194,11 +163,16 @@ func TestSwitchFiltersOutItself(t *testing.T) {
// addr should be rejected in addPeer based on the same ID
err := s1.DialPeerWithAddress(rp.Addr(), false)
if assert.Error(t, err) {
assert.Equal(t, ErrSwitchConnectToSelf{rp.Addr()}.Error(), err.Error())
if err, ok := err.(ErrRejected); ok {
if !err.IsSelf() {
t.Errorf("expected self to be rejected")
}
} else {
t.Errorf("expected ErrRejected")
}
}
assert.True(t, s1.addrBook.OurAddress(rp.Addr()))
assert.False(t, s1.addrBook.HasAddress(rp.Addr()))
rp.Stop()
@ -206,6 +180,119 @@ func TestSwitchFiltersOutItself(t *testing.T) {
assertNoPeersAfterTimeout(t, s1, 100*time.Millisecond)
}
func TestSwitchPeerFilter(t *testing.T) {
var (
filters = []PeerFilterFunc{
func(_ IPeerSet, _ Peer) error { return nil },
func(_ IPeerSet, _ Peer) error { return fmt.Errorf("denied!") },
func(_ IPeerSet, _ Peer) error { return nil },
}
sw = MakeSwitch(
cfg,
1,
"testing",
"123.123.123",
initSwitchFunc,
SwitchPeerFilters(filters...),
)
)
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
t.Fatal(err)
}
err = sw.addPeer(p)
if err, ok := err.(ErrRejected); ok {
if !err.IsFiltered() {
t.Errorf("expected peer to be filtered")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func TestSwitchPeerFilterTimeout(t *testing.T) {
var (
filters = []PeerFilterFunc{
func(_ IPeerSet, _ Peer) error {
time.Sleep(10 * time.Millisecond)
return nil
},
}
sw = MakeSwitch(
cfg,
1,
"testing",
"123.123.123",
initSwitchFunc,
SwitchFilterTimeout(5*time.Millisecond),
SwitchPeerFilters(filters...),
)
)
defer sw.Stop()
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
t.Fatal(err)
}
err = sw.addPeer(p)
if _, ok := err.(ErrFilterTimeout); !ok {
t.Errorf("expected ErrFilterTimeout")
}
}
func TestSwitchPeerFilterDuplicate(t *testing.T) {
sw := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
// simulate remote peer
rp := &remotePeer{PrivKey: ed25519.GenPrivKey(), Config: cfg}
rp.Start()
defer rp.Stop()
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
})
if err != nil {
t.Fatal(err)
}
if err := sw.addPeer(p); err != nil {
t.Fatal(err)
}
err = sw.addPeer(p)
if err, ok := err.(ErrRejected); ok {
if !err.IsDuplicate() {
t.Errorf("expected peer to be duplicate")
}
} else {
t.Errorf("expected ErrRejected")
}
}
func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) {
time.Sleep(timeout)
if sw.Peers().Size() != 0 {
@ -213,41 +300,6 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration)
}
}
func TestConnIDFilter(t *testing.T) {
s1 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
s2 := MakeSwitch(cfg, 1, "testing", "123.123.123", initSwitchFunc)
defer s1.Stop()
defer s2.Stop()
c1, c2 := conn.NetPipe()
s1.SetIDFilter(func(id ID) error {
if id == s2.nodeInfo.ID {
return fmt.Errorf("Error: pipe is blacklisted")
}
return nil
})
s2.SetIDFilter(func(id ID) error {
if id == s1.nodeInfo.ID {
return fmt.Errorf("Error: pipe is blacklisted")
}
return nil
})
go func() {
err := s1.addPeerWithConnection(c1)
assert.NotNil(t, err, "expected error")
}()
go func() {
err := s2.addPeerWithConnection(c2)
assert.NotNil(t, err, "expected error")
}()
assertNoPeersAfterTimeout(t, s1, 400*time.Millisecond)
assertNoPeersAfterTimeout(t, s2, 400*time.Millisecond)
}
func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
assert, require := assert.New(t), require.New(t)
@ -263,19 +315,23 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) {
rp.Start()
defer rp.Stop()
pc, err := newOutboundPeerConn(rp.Addr(), cfg, false, sw.nodeKey.PrivKey)
require.Nil(err)
err = sw.addPeer(pc)
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
reactorsByCh: sw.reactorsByCh,
})
require.Nil(err)
peer := sw.Peers().Get(rp.ID())
require.NotNil(peer)
err = sw.addPeer(p)
require.Nil(err)
require.NotNil(sw.Peers().Get(rp.ID()))
// simulate failure by closing connection
pc.CloseConn()
p.(*peer).CloseConn()
assertNoPeersAfterTimeout(t, sw, 100*time.Millisecond)
assert.False(peer.IsRunning())
assert.False(p.IsRunning())
}
func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
@ -293,17 +349,20 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
rp.Start()
defer rp.Stop()
pc, err := newOutboundPeerConn(rp.Addr(), cfg, true, sw.nodeKey.PrivKey)
// sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey,
p, err := sw.transport.Dial(*rp.Addr(), peerConfig{
chDescs: sw.chDescs,
onPeerError: sw.StopPeerForError,
persistent: true,
reactorsByCh: sw.reactorsByCh,
})
require.Nil(err)
require.Nil(sw.addPeer(pc))
require.Nil(sw.addPeer(p))
peer := sw.Peers().Get(rp.ID())
require.NotNil(peer)
require.NotNil(sw.Peers().Get(rp.ID()))
// simulate failure by closing connection
pc.CloseConn()
p.(*peer).CloseConn()
// TODO: remove sleep, detect the disconnection, wait for reconnect
npeers := sw.Peers().Size()
@ -315,7 +374,7 @@ func TestSwitchReconnectsToPersistentPeer(t *testing.T) {
}
}
assert.NotZero(npeers)
assert.False(peer.IsRunning())
assert.False(p.IsRunning())
// simulate another remote peer
rp = &remotePeer{

View File

@ -3,7 +3,9 @@ package p2p
import (
"fmt"
"net"
"time"
crypto "github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/ed25519"
cmn "github.com/tendermint/tendermint/libs/common"
"github.com/tendermint/tendermint/libs/log"
@ -104,14 +106,32 @@ func Connect2Switches(switches []*Switch, i, j int) {
}
func (sw *Switch) addPeerWithConnection(conn net.Conn) error {
pc, err := newInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey)
pc, err := testInboundPeerConn(conn, sw.config, sw.nodeKey.PrivKey)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
}
return err
}
if err = sw.addPeer(pc); err != nil {
ni, err := handshake(conn, 50*time.Millisecond, sw.nodeInfo)
if err != nil {
if err := conn.Close(); err != nil {
sw.Logger.Error("Error closing connection", "err", err)
}
return err
}
p := newPeer(
pc,
sw.mConfig,
ni,
sw.reactorsByCh,
sw.chDescs,
sw.StopPeerForError,
)
if err = sw.addPeer(p); err != nil {
pc.CloseConn()
return err
}
@ -131,35 +151,99 @@ func StartSwitches(switches []*Switch) error {
return nil
}
func MakeSwitch(cfg *config.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch {
// new switch, add reactors
func MakeSwitch(
cfg *config.P2PConfig,
i int,
network, version string,
initSwitch func(int, *Switch) *Switch,
opts ...SwitchOption,
) *Switch {
var (
nodeKey = NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
ni = NodeInfo{
ID: nodeKey.ID(),
Moniker: fmt.Sprintf("switch%d", i),
Network: network,
Version: version,
ListenAddr: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023),
Other: NodeInfoOther{
AminoVersion: "1.0",
P2PVersion: "1.0",
ConsensusVersion: "1.0",
RPCVersion: "1.0",
TxIndex: "off",
RPCAddress: fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023),
},
}
)
addr, err := NewNetAddressStringWithOptionalID(
IDAddressString(nodeKey.ID(), ni.ListenAddr),
)
if err != nil {
panic(err)
}
t := NewMultiplexTransport(ni, nodeKey)
if err := t.Listen(*addr); err != nil {
panic(err)
}
// TODO: let the config be passed in?
nodeKey := &NodeKey{
PrivKey: ed25519.GenPrivKey(),
}
sw := NewSwitch(cfg)
sw := initSwitch(i, NewSwitch(cfg, t, opts...))
sw.SetLogger(log.TestingLogger())
sw = initSwitch(i, sw)
addr := fmt.Sprintf("127.0.0.1:%d", cmn.RandIntn(64512)+1023)
ni := NodeInfo{
ID: nodeKey.ID(),
Moniker: fmt.Sprintf("switch%d", i),
Network: network,
Version: version,
ListenAddr: addr,
Other: NodeInfoOther{
AminoVersion: "1.0",
P2PVersion: "1.0",
ConsensusVersion: "1.0",
RPCVersion: "1.0",
TxIndex: "off",
RPCAddress: addr,
},
}
sw.SetNodeKey(&nodeKey)
for ch := range sw.reactorsByCh {
ni.Channels = append(ni.Channels, ch)
}
// TODO: We need to setup reactors ahead of time so the NodeInfo is properly
// populated and we don't have to do those awkward overrides and setters.
t.nodeInfo = ni
sw.SetNodeInfo(ni)
sw.SetNodeKey(nodeKey)
return sw
}
func testInboundPeerConn(
conn net.Conn,
config *config.P2PConfig,
ourNodePrivKey crypto.PrivKey,
) (peerConn, error) {
return testPeerConn(conn, config, false, false, ourNodePrivKey, nil)
}
func testPeerConn(
rawConn net.Conn,
cfg *config.P2PConfig,
outbound, persistent bool,
ourNodePrivKey crypto.PrivKey,
originalAddr *NetAddress,
) (pc peerConn, err error) {
conn := rawConn
// Fuzz connection
if cfg.TestFuzz {
// so we have time to do peer handshakes and get set up
conn = FuzzConnAfterFromConfig(conn, 10*time.Second, cfg.TestFuzzConfig)
}
// Encrypt connection
conn, err = upgradeSecretConn(conn, cfg.HandshakeTimeout, ourNodePrivKey)
if err != nil {
return pc, cmn.ErrorWrap(err, "Error creating peer")
}
// Only the information we already have
return peerConn{
config: cfg,
outbound: outbound,
persistent: persistent,
conn: conn,
originalAddr: originalAddr,
}, nil
}

View File

@ -191,7 +191,7 @@ func Validators(heightPtr *int64) (*ctypes.ResultValidators, error) {
// ```
func DumpConsensusState() (*ctypes.ResultDumpConsensusState, error) {
// Get Peer consensus states.
peers := p2pSwitch.Peers().List()
peers := p2pPeers.Peers().List()
peerStates := make([]ctypes.PeerStateInfo, len(peers))
for i, peer := range peers {
peerState := peer.Get(types.PeerStateKey).(*cm.PeerState)

View File

@ -35,13 +35,8 @@ import (
// }
// ```
func NetInfo() (*ctypes.ResultNetInfo, error) {
listening := p2pSwitch.IsListening()
listeners := []string{}
for _, listener := range p2pSwitch.Listeners() {
listeners = append(listeners, listener.String())
}
peers := []ctypes.Peer{}
for _, peer := range p2pSwitch.Peers().List() {
for _, peer := range p2pPeers.Peers().List() {
peers = append(peers, ctypes.Peer{
NodeInfo: peer.NodeInfo(),
IsOutbound: peer.IsOutbound(),
@ -52,8 +47,8 @@ func NetInfo() (*ctypes.ResultNetInfo, error) {
// PRO: useful info
// CON: privacy
return &ctypes.ResultNetInfo{
Listening: listening,
Listeners: listeners,
Listening: p2pTransport.IsListening(),
Listeners: p2pTransport.Listeners(),
NPeers: len(peers),
Peers: peers,
}, nil
@ -65,7 +60,7 @@ func UnsafeDialSeeds(seeds []string) (*ctypes.ResultDialSeeds, error) {
}
// starts go routines to dial each peer after random delays
logger.Info("DialSeeds", "addrBook", addrBook, "seeds", seeds)
err := p2pSwitch.DialPeersAsync(addrBook, seeds, false)
err := p2pPeers.DialPeersAsync(addrBook, seeds, false)
if err != nil {
return &ctypes.ResultDialSeeds{}, err
}
@ -78,7 +73,7 @@ func UnsafeDialPeers(peers []string, persistent bool) (*ctypes.ResultDialPeers,
}
// starts go routines to dial each peer after random delays
logger.Info("DialPeers", "addrBook", addrBook, "peers", peers, "persistent", persistent)
err := p2pSwitch.DialPeersAsync(addrBook, peers, persistent)
err := p2pPeers.DialPeersAsync(addrBook, peers, persistent)
if err != nil {
return &ctypes.ResultDialPeers{}, err
}

View File

@ -34,13 +34,16 @@ type Consensus interface {
GetRoundStateSimpleJSON() ([]byte, error)
}
type P2P interface {
Listeners() []p2p.Listener
Peers() p2p.IPeerSet
NumPeers() (outbound, inbound, dialig int)
NodeInfo() p2p.NodeInfo
type transport interface {
Listeners() []string
IsListening() bool
NodeInfo() p2p.NodeInfo
}
type peers interface {
DialPeersAsync(p2p.AddrBook, []string, bool) error
NumPeers() (outbound, inbound, dialig int)
Peers() p2p.IPeerSet
}
//----------------------------------------------
@ -56,7 +59,8 @@ var (
blockStore sm.BlockStore
evidencePool sm.EvidencePool
consensusState Consensus
p2pSwitch P2P
p2pPeers peers
p2pTransport transport
// objects
pubKey crypto.PubKey
@ -90,8 +94,12 @@ func SetConsensusState(cs Consensus) {
consensusState = cs
}
func SetSwitch(sw P2P) {
p2pSwitch = sw
func SetP2PPeers(p peers) {
p2pPeers = p
}
func SetP2PTransport(t transport) {
p2pTransport = t
}
func SetPubKey(pk crypto.PubKey) {

View File

@ -91,7 +91,7 @@ func Status() (*ctypes.ResultStatus, error) {
}
result := &ctypes.ResultStatus{
NodeInfo: p2pSwitch.NodeInfo(),
NodeInfo: p2pTransport.NodeInfo(),
SyncInfo: ctypes.SyncInfo{
LatestBlockHash: latestBlockHash,
LatestAppHash: latestAppHash,