diff --git a/p2p/base_reactor.go b/p2p/base_reactor.go index e8107d73..a24a7629 100644 --- a/p2p/base_reactor.go +++ b/p2p/base_reactor.go @@ -1,12 +1,15 @@ package p2p -import cmn "github.com/tendermint/tmlibs/common" +import ( + "github.com/tendermint/tendermint/p2p/tmconn" + cmn "github.com/tendermint/tmlibs/common" +) type Reactor interface { cmn.Service // Start, Stop SetSwitch(*Switch) - GetChannels() []*ChannelDescriptor + GetChannels() []*tmconn.ChannelDescriptor AddPeer(peer Peer) RemovePeer(peer Peer, reason interface{}) Receive(chID byte, peer Peer, msgBytes []byte) // CONTRACT: msgBytes are not nil @@ -29,7 +32,7 @@ func NewBaseReactor(name string, impl Reactor) *BaseReactor { func (br *BaseReactor) SetSwitch(sw *Switch) { br.Switch = sw } -func (_ *BaseReactor) GetChannels() []*ChannelDescriptor { return nil } +func (_ *BaseReactor) GetChannels() []*tmconn.ChannelDescriptor { return nil } func (_ *BaseReactor) AddPeer(peer Peer) {} func (_ *BaseReactor) RemovePeer(peer Peer, reason interface{}) {} func (_ *BaseReactor) Receive(chID byte, peer Peer, msgBytes []byte) {} diff --git a/p2p/listener.go b/p2p/listener.go index 884c45ee..01d71833 100644 --- a/p2p/listener.go +++ b/p2p/listener.go @@ -6,6 +6,7 @@ import ( "strconv" "time" + "github.com/tendermint/tendermint/p2p/types" "github.com/tendermint/tendermint/p2p/upnp" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" @@ -13,8 +14,8 @@ import ( type Listener interface { Connections() <-chan net.Conn - InternalAddress() *NetAddress - ExternalAddress() *NetAddress + InternalAddress() *types.NetAddress + ExternalAddress() *types.NetAddress String() string Stop() error } @@ -24,8 +25,8 @@ type DefaultListener struct { cmn.BaseService listener net.Listener - intAddr *NetAddress - extAddr *NetAddress + intAddr *types.NetAddress + extAddr *types.NetAddress connections chan net.Conn } @@ -71,14 +72,14 @@ func NewDefaultListener(protocol string, lAddr string, skipUPNP bool, logger log logger.Info("Local listener", "ip", listenerIP, "port", listenerPort) // Determine internal address... - var intAddr *NetAddress - intAddr, err = NewNetAddressString(lAddr) + var intAddr *types.NetAddress + intAddr, err = types.NewNetAddressString(lAddr) if err != nil { panic(err) } // Determine external address... - var extAddr *NetAddress + var extAddr *types.NetAddress if !skipUPNP { // If the lAddrIP is INADDR_ANY, try UPnP if lAddrIP == "" || lAddrIP == "0.0.0.0" { @@ -151,11 +152,11 @@ func (l *DefaultListener) Connections() <-chan net.Conn { return l.connections } -func (l *DefaultListener) InternalAddress() *NetAddress { +func (l *DefaultListener) InternalAddress() *types.NetAddress { return l.intAddr } -func (l *DefaultListener) ExternalAddress() *NetAddress { +func (l *DefaultListener) ExternalAddress() *types.NetAddress { return l.extAddr } @@ -172,7 +173,7 @@ func (l *DefaultListener) String() string { /* external address helpers */ // UPNP external address discovery & port mapping -func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *NetAddress { +func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) *types.NetAddress { logger.Info("Getting UPNP external address") nat, err := upnp.Discover() if err != nil { @@ -198,11 +199,11 @@ func getUPNPExternalAddress(externalPort, internalPort int, logger log.Logger) * } logger.Info("Got UPNP external address", "address", ext) - return NewNetAddressIPPort(ext, uint16(externalPort)) + return types.NewNetAddressIPPort(ext, uint16(externalPort)) } // TODO: use syscalls: see issue #712 -func getNaiveExternalAddress(port int, settleForLocal bool, logger log.Logger) *NetAddress { +func getNaiveExternalAddress(port int, settleForLocal bool, logger log.Logger) *types.NetAddress { addrs, err := net.InterfaceAddrs() if err != nil { panic(cmn.Fmt("Could not fetch interface addresses: %v", err)) @@ -217,7 +218,7 @@ func getNaiveExternalAddress(port int, settleForLocal bool, logger log.Logger) * if v4 == nil || (!settleForLocal && v4[0] == 127) { continue } // loopback - return NewNetAddressIPPort(ipnet.IP, uint16(port)) + return types.NewNetAddressIPPort(ipnet.IP, uint16(port)) } // try again, but settle for local diff --git a/p2p/peer.go b/p2p/peer.go index 596b9216..17c5861f 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -11,17 +11,20 @@ import ( wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + "github.com/tendermint/tendermint/p2p/tmconn" + "github.com/tendermint/tendermint/p2p/types" ) // Peer is an interface representing a peer connected on a reactor. type Peer interface { cmn.Service - ID() ID // peer's cryptographic ID - IsOutbound() bool // did we dial the peer - IsPersistent() bool // do we redial this peer when we disconnect - NodeInfo() NodeInfo // peer's info - Status() ConnectionStatus + ID() types.ID // peer's cryptographic ID + IsOutbound() bool // did we dial the peer + IsPersistent() bool // do we redial this peer when we disconnect + NodeInfo() types.NodeInfo // peer's info + Status() tmconn.ConnectionStatus Send(byte, interface{}) bool TrySend(byte, interface{}) bool @@ -40,13 +43,13 @@ type peer struct { outbound bool - conn net.Conn // source connection - mconn *MConnection // multiplex connection + conn net.Conn // source connection + mconn *tmconn.MConnection // multiplex connection persistent bool config *PeerConfig - nodeInfo NodeInfo + nodeInfo types.NodeInfo Data *cmn.CMap // User data. } @@ -58,7 +61,7 @@ type PeerConfig struct { HandshakeTimeout time.Duration `mapstructure:"handshake_timeout"` DialTimeout time.Duration `mapstructure:"dial_timeout"` - MConfig *MConnConfig `mapstructure:"connection"` + MConfig *tmconn.MConnConfig `mapstructure:"connection"` Fuzz bool `mapstructure:"fuzz"` // fuzz connection (for testing) FuzzConfig *FuzzConnConfig `mapstructure:"fuzz_config"` @@ -70,13 +73,13 @@ func DefaultPeerConfig() *PeerConfig { AuthEnc: true, HandshakeTimeout: 20, // * time.Second, DialTimeout: 3, // * time.Second, - MConfig: DefaultMConnConfig(), + MConfig: tmconn.DefaultMConnConfig(), Fuzz: false, FuzzConfig: DefaultFuzzConnConfig(), } } -func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, +func newOutboundPeer(addr *types.NetAddress, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig, persistent bool) (*peer, error) { conn, err := dial(addr, config) @@ -96,7 +99,7 @@ func newOutboundPeer(addr *NetAddress, reactorsByCh map[byte]Reactor, chDescs [] return peer, nil } -func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, +func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { // TODO: issue PoW challenge @@ -104,7 +107,7 @@ func newInboundPeer(conn net.Conn, reactorsByCh map[byte]Reactor, chDescs []*Cha return newPeerFromConnAndConfig(conn, false, reactorsByCh, chDescs, onPeerError, ourNodePrivKey, config) } -func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, +func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, onPeerError func(Peer, interface{}), ourNodePrivKey crypto.PrivKey, config *PeerConfig) (*peer, error) { conn := rawConn @@ -122,7 +125,7 @@ func newPeerFromConnAndConfig(rawConn net.Conn, outbound bool, reactorsByCh map[ } var err error - conn, err = MakeSecretConnection(conn, ourNodePrivKey) + conn, err = tmconn.MakeSecretConnection(conn, ourNodePrivKey) if err != nil { return nil, errors.Wrap(err, "Error creating peer") } @@ -171,8 +174,8 @@ func (p *peer) OnStop() { // Implements Peer // ID returns the peer's ID - the hex encoded hash of its pubkey. -func (p *peer) ID() ID { - return PubKeyToID(p.PubKey()) +func (p *peer) ID() types.ID { + return types.PubKeyToID(p.PubKey()) } // IsOutbound returns true if the connection is outbound, false otherwise. @@ -186,12 +189,12 @@ func (p *peer) IsPersistent() bool { } // NodeInfo returns a copy of the peer's NodeInfo. -func (p *peer) NodeInfo() NodeInfo { +func (p *peer) NodeInfo() types.NodeInfo { return p.nodeInfo } // Status returns the peer's ConnectionStatus. -func (p *peer) Status() ConnectionStatus { +func (p *peer) Status() tmconn.ConnectionStatus { return p.mconn.Status() } @@ -236,13 +239,13 @@ func (p *peer) CloseConn() { // HandshakeTimeout performs the Tendermint P2P handshake between a given node and the peer // by exchanging their NodeInfo. It sets the received nodeInfo on the peer. // NOTE: blocking -func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) error { +func (p *peer) HandshakeTimeout(ourNodeInfo types.NodeInfo, timeout time.Duration) error { // Set deadline for handshake so we don't block forever on conn.ReadFull if err := p.conn.SetDeadline(time.Now().Add(timeout)); err != nil { return errors.Wrap(err, "Error setting deadline") } - var peerNodeInfo NodeInfo + var peerNodeInfo types.NodeInfo var err1 error var err2 error cmn.Parallel( @@ -252,7 +255,7 @@ func (p *peer) HandshakeTimeout(ourNodeInfo NodeInfo, timeout time.Duration) err }, func() { var n int - wire.ReadBinary(&peerNodeInfo, p.conn, maxNodeInfoSize, &n, &err2) + wire.ReadBinary(&peerNodeInfo, p.conn, types.MaxNodeInfoSize(), &n, &err2) p.Logger.Info("Peer handshake", "peerNodeInfo", peerNodeInfo) }) if err1 != nil { @@ -283,7 +286,7 @@ func (p *peer) PubKey() crypto.PubKey { if !p.nodeInfo.PubKey.Empty() { return p.nodeInfo.PubKey } else if p.config.AuthEnc { - return p.conn.(*SecretConnection).RemotePubKey() + return p.conn.(*tmconn.SecretConnection).RemotePubKey() } panic("Attempt to get peer's PubKey before calling Handshake") } @@ -308,7 +311,7 @@ func (p *peer) String() string { //------------------------------------------------------------------ // helper funcs -func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { +func dial(addr *types.NetAddress, config *PeerConfig) (net.Conn, error) { conn, err := addr.DialTimeout(config.DialTimeout * time.Second) if err != nil { return nil, err @@ -316,8 +319,8 @@ func dial(addr *NetAddress, config *PeerConfig) (net.Conn, error) { return conn, nil } -func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*ChannelDescriptor, - onPeerError func(Peer, interface{}), config *MConnConfig) *MConnection { +func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, chDescs []*tmconn.ChannelDescriptor, + onPeerError func(Peer, interface{}), config *tmconn.MConnConfig) *tmconn.MConnection { onReceive := func(chID byte, msgBytes []byte) { reactor := reactorsByCh[chID] @@ -331,5 +334,5 @@ func createMConnection(conn net.Conn, p *peer, reactorsByCh map[byte]Reactor, ch onPeerError(p, r) } - return NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) + return tmconn.NewMConnectionWithConfig(conn, chDescs, onReceive, onError, config) } diff --git a/p2p/peer_set.go b/p2p/peer_set.go index dc53174a..7a0680cb 100644 --- a/p2p/peer_set.go +++ b/p2p/peer_set.go @@ -2,12 +2,14 @@ package p2p import ( "sync" + + "github.com/tendermint/tendermint/p2p/types" ) // IPeerSet has a (immutable) subset of the methods of PeerSet. type IPeerSet interface { - Has(key ID) bool - Get(key ID) Peer + Has(key types.ID) bool + Get(key types.ID) Peer List() []Peer Size() int } @@ -18,7 +20,7 @@ type IPeerSet interface { // Iteration over the peers is super fast and thread-safe. type PeerSet struct { mtx sync.Mutex - lookup map[ID]*peerSetItem + lookup map[types.ID]*peerSetItem list []Peer } @@ -30,7 +32,7 @@ type peerSetItem struct { // NewPeerSet creates a new peerSet with a list of initial capacity of 256 items. func NewPeerSet() *PeerSet { return &PeerSet{ - lookup: make(map[ID]*peerSetItem), + lookup: make(map[types.ID]*peerSetItem), list: make([]Peer, 0, 256), } } @@ -41,7 +43,7 @@ func (ps *PeerSet) Add(peer Peer) error { ps.mtx.Lock() defer ps.mtx.Unlock() if ps.lookup[peer.ID()] != nil { - return ErrSwitchDuplicatePeer + return types.ErrSwitchDuplicatePeer } index := len(ps.list) @@ -54,7 +56,7 @@ func (ps *PeerSet) Add(peer Peer) error { // Has returns true iff the PeerSet contains // the peer referred to by this peerKey. -func (ps *PeerSet) Has(peerKey ID) bool { +func (ps *PeerSet) Has(peerKey types.ID) bool { ps.mtx.Lock() _, ok := ps.lookup[peerKey] ps.mtx.Unlock() @@ -62,7 +64,7 @@ func (ps *PeerSet) Has(peerKey ID) bool { } // Get looks up a peer by the provided peerKey. -func (ps *PeerSet) Get(peerKey ID) Peer { +func (ps *PeerSet) Get(peerKey types.ID) Peer { ps.mtx.Lock() defer ps.mtx.Unlock() item, ok := ps.lookup[peerKey] diff --git a/p2p/peer_set_test.go b/p2p/peer_set_test.go index e906eb8e..7d7ed106 100644 --- a/p2p/peer_set_test.go +++ b/p2p/peer_set_test.go @@ -8,13 +8,14 @@ import ( "github.com/stretchr/testify/assert" crypto "github.com/tendermint/go-crypto" + "github.com/tendermint/tendermint/p2p/types" cmn "github.com/tendermint/tmlibs/common" ) // Returns an empty dummy peer func randPeer() *peer { return &peer{ - nodeInfo: NodeInfo{ + nodeInfo: types.NodeInfo{ ListenAddr: cmn.Fmt("%v.%v.%v.%v:46656", rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256), PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), }, @@ -119,7 +120,7 @@ func TestPeerSetAddDuplicate(t *testing.T) { // Our next procedure is to ensure that only one addition // succeeded and that the rest are each ErrSwitchDuplicatePeer. - wantErrCount, gotErrCount := n-1, errsTally[ErrSwitchDuplicatePeer] + wantErrCount, gotErrCount := n-1, errsTally[types.ErrSwitchDuplicatePeer] assert.Equal(t, wantErrCount, gotErrCount, "invalid ErrSwitchDuplicatePeer count") wantNilErrCount, gotNilErrCount := 1, errsTally[nil] diff --git a/p2p/peer_test.go b/p2p/peer_test.go index d99fff5e..dc13cf9d 100644 --- a/p2p/peer_test.go +++ b/p2p/peer_test.go @@ -10,6 +10,8 @@ import ( "github.com/stretchr/testify/require" crypto "github.com/tendermint/go-crypto" + "github.com/tendermint/tendermint/p2p/tmconn" + "github.com/tendermint/tendermint/p2p/types" ) func TestPeerBasic(t *testing.T) { @@ -80,8 +82,8 @@ func TestPeerSend(t *testing.T) { assert.True(p.Send(0x01, "Asylum")) } -func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) (*peer, error) { - chDescs := []*ChannelDescriptor{ +func createOutboundPeerAndPerformHandshake(addr *types.NetAddress, config *PeerConfig) (*peer, error) { + chDescs := []*tmconn.ChannelDescriptor{ {ID: 0x01, Priority: 1}, } reactorsByCh := map[byte]Reactor{0x01: NewTestReactor(chDescs, true)} @@ -90,7 +92,7 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) if err != nil { return nil, err } - err = p.HandshakeTimeout(NodeInfo{ + err = p.HandshakeTimeout(types.NodeInfo{ PubKey: pk.PubKey(), Moniker: "host_peer", Network: "testing", @@ -105,11 +107,11 @@ func createOutboundPeerAndPerformHandshake(addr *NetAddress, config *PeerConfig) type remotePeer struct { PrivKey crypto.PrivKey Config *PeerConfig - addr *NetAddress + addr *types.NetAddress quit chan struct{} } -func (p *remotePeer) Addr() *NetAddress { +func (p *remotePeer) Addr() *types.NetAddress { return p.addr } @@ -122,7 +124,7 @@ func (p *remotePeer) Start() { if e != nil { golog.Fatalf("net.Listen tcp :0: %+v", e) } - p.addr = NewNetAddress("", l.Addr()) + p.addr = types.NewNetAddress("", l.Addr()) p.quit = make(chan struct{}) go p.accept(l) } @@ -137,11 +139,11 @@ func (p *remotePeer) accept(l net.Listener) { if err != nil { golog.Fatalf("Failed to accept conn: %+v", err) } - peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config) + peer, err := newInboundPeer(conn, make(map[byte]Reactor), make([]*tmconn.ChannelDescriptor, 0), func(p Peer, r interface{}) {}, p.PrivKey, p.Config) if err != nil { golog.Fatalf("Failed to create a peer: %+v", err) } - err = peer.HandshakeTimeout(NodeInfo{ + err = peer.HandshakeTimeout(types.NodeInfo{ PubKey: p.PrivKey.PubKey(), Moniker: "remote_peer", Network: "testing", diff --git a/p2p/addrbook/addrbook.go b/p2p/pex/addrbook.go similarity index 76% rename from p2p/addrbook/addrbook.go rename to p2p/pex/addrbook.go index 1f317a2e..93f35211 100644 --- a/p2p/addrbook/addrbook.go +++ b/p2p/pex/addrbook.go @@ -2,7 +2,7 @@ // Originally Copyright (c) 2013-2014 Conformal Systems LLC. // https://github.com/conformal/btcd/blob/master/LICENSE -package addrbook +package pex import ( "crypto/sha256" @@ -16,6 +16,8 @@ import ( crypto "github.com/tendermint/go-crypto" cmn "github.com/tendermint/tmlibs/common" + + "github.com/tendermint/tendermint/p2p/types" ) const ( @@ -29,25 +31,33 @@ const ( type AddrBook interface { cmn.Service + // Add our own addresses so we don't later add ourselves + AddOurAddress(*types.NetAddress) + // Add and remove an address - AddAddress(addr *NetAddress, src *NetAddress) - RemoveAddress(addr *NetAddress) + AddAddress(addr *types.NetAddress, src *types.NetAddress) error + RemoveAddress(addr *types.NetAddress) // Do we need more peers? NeedMoreAddrs() bool // Pick an address to dial - PickAddress(newBias int) *NetAddress + PickAddress(newBias int) *types.NetAddress // Mark address - MarkGood(*NetAddress) - MarkAttempt(*Address) - MarkBad(*NetAddress) + MarkGood(*types.NetAddress) + MarkAttempt(*types.NetAddress) + MarkBad(*types.NetAddress) // Send a selection of addresses to peers - GetSelection() []*NetAddress + GetSelection() []*types.NetAddress + + // TODO: remove + ListOfKnownAddresses() []*knownAddress } +var _ AddrBook = (*addrBook)(nil) + // addrBook - concurrency safe peer address manager. // Implements AddrBook. type addrBook struct { @@ -56,13 +66,13 @@ type addrBook struct { // immutable after creation filePath string routabilityStrict bool - key string + key string // random prefix for bucket placement // accessed concurrently mtx sync.Mutex rand *rand.Rand - ourAddrs map[string]*NetAddress - addrLookup map[ID]*knownAddress // new & old + ourAddrs map[string]*types.NetAddress + addrLookup map[types.ID]*knownAddress // new & old bucketsOld []map[string]*knownAddress bucketsNew []map[string]*knownAddress nOld int @@ -74,10 +84,10 @@ type addrBook struct { // NewAddrBook creates a new address book. // Use Start to begin processing asynchronous address updates. func NewAddrBook(filePath string, routabilityStrict bool) *addrBook { - am := &AddrBook{ - rand: rand.New(rand.NewSource(time.Now().UnixNano())), - ourAddrs: make(map[string]*NetAddress), - addrLookup: make(map[ID]*knownAddress), + am := &addrBook{ + rand: rand.New(rand.NewSource(time.Now().UnixNano())), // TODO: seed from outside + ourAddrs: make(map[string]*types.NetAddress), + addrLookup: make(map[types.ID]*knownAddress), filePath: filePath, routabilityStrict: routabilityStrict, } @@ -126,26 +136,26 @@ func (a *addrBook) Wait() { a.wg.Wait() } -// AddOurAddress adds another one of our addresses. -func (a *AddrBook) AddOurAddress(addr *NetAddress) { +//------------------------------------------------------- + +// AddOurAddress one of our addresses. +func (a *addrBook) AddOurAddress(addr *types.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() a.Logger.Info("Add our address to book", "addr", addr) a.ourAddrs[addr.String()] = addr } -//------------------------------------------------------- - // AddAddress implements AddrBook - adds the given address as received from the given source. // NOTE: addr must not be nil -func (a *addrBook) AddAddress(addr *NetAddress, src *NetAddress) error { +func (a *addrBook) AddAddress(addr *types.NetAddress, src *types.NetAddress) error { a.mtx.Lock() defer a.mtx.Unlock() return a.addAddress(addr, src) } // RemoveAddress implements AddrBook - removes the address from the book. -func (a *addrBook) RemoveAddress(addr *NetAddress) { +func (a *addrBook) RemoveAddress(addr *types.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() ka := a.addrLookup[addr.ID] @@ -167,7 +177,7 @@ func (a *addrBook) NeedMoreAddrs() bool { // and determines how biased we are to pick an address from a new bucket. // PickAddress returns nil if the AddrBook is empty or if we try to pick // from an empty bucket. -func (a *addrBook) PickAddress(newBias int) *NetAddress { +func (a *addrBook) PickAddress(newBias int) *types.NetAddress { a.mtx.Lock() defer a.mtx.Unlock() @@ -213,7 +223,7 @@ func (a *addrBook) PickAddress(newBias int) *NetAddress { // MarkGood implements AddrBook - it marks the peer as good and // moves it into an "old" bucket. -func (a *addrBook) MarkGood(addr *NetAddress) { +func (a *addrBook) MarkGood(addr *types.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() ka := a.addrLookup[addr.ID] @@ -227,7 +237,7 @@ func (a *addrBook) MarkGood(addr *NetAddress) { } // MarkAttempt implements AddrBook - it marks that an attempt was made to connect to the address. -func (a *addrBook) MarkAttempt(addr *NetAddress) { +func (a *addrBook) MarkAttempt(addr *types.NetAddress) { a.mtx.Lock() defer a.mtx.Unlock() ka := a.addrLookup[addr.ID] @@ -239,13 +249,13 @@ func (a *addrBook) MarkAttempt(addr *NetAddress) { // MarkBad implements AddrBook. Currently it just ejects the address. // TODO: black list for some amount of time -func (a *addrBook) MarkBad(addr *NetAddress) { +func (a *addrBook) MarkBad(addr *types.NetAddress) { a.RemoveAddress(addr) } // GetSelection implements AddrBook. // It randomly selects some addresses (old & new). Suitable for peer-exchange protocols. -func (a *addrBook) GetSelection() []*NetAddress { +func (a *addrBook) GetSelection() []*types.NetAddress { a.mtx.Lock() defer a.mtx.Unlock() @@ -253,7 +263,7 @@ func (a *addrBook) GetSelection() []*NetAddress { return nil } - allAddr := make([]*NetAddress, a.size()) + allAddr := make([]*types.NetAddress, a.size()) i := 0 for _, ka := range a.addrLookup { allAddr[i] = ka.Addr @@ -279,7 +289,7 @@ func (a *addrBook) GetSelection() []*NetAddress { } // ListOfKnownAddresses returns the new and old addresses. -func (a *AddrBook) ListOfKnownAddresses() []*knownAddress { +func (a *addrBook) ListOfKnownAddresses() []*knownAddress { a.mtx.Lock() defer a.mtx.Unlock() @@ -290,17 +300,6 @@ func (a *AddrBook) ListOfKnownAddresses() []*knownAddress { return addrs } -/* Loading & Saving */ - -type addrBookJSON struct { - Key string - Addrs []*knownAddress -} - -func (a *AddrBook) saveToFile(filePath string) { - a.Logger.Info("Saving AddrBook to file", "size", a.Size()) -} - //------------------------------------------------ // Size returns the number of addresses in the book. @@ -334,6 +333,8 @@ out: a.Logger.Info("Address handler done") } +//---------------------------------------------------------- + func (a *addrBook) getBucket(bucketType byte, bucketIdx int) map[string]*knownAddress { switch bucketType { case bucketTypeNew: @@ -365,17 +366,18 @@ func (a *addrBook) addToNewBucket(ka *knownAddress, bucketIdx int) bool { // Enforce max addresses. if len(bucket) > newBucketSize { - a.Logger.Info("new bucket is full, expiring old ") + a.Logger.Info("new bucket is full, expiring new") a.expireNew(bucketIdx) } // Add to bucket. bucket[addrStr] = ka + // increment nNew if the peer doesnt already exist in a bucket if ka.addBucketRef(bucketIdx) == 1 { a.nNew++ } - // Ensure in addrLookup + // Add it to addrLookup a.addrLookup[ka.ID()] = ka return true @@ -449,6 +451,8 @@ func (a *addrBook) removeFromAllBuckets(ka *knownAddress) { delete(a.addrLookup, ka.ID()) } +//---------------------------------------------------------- + func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { bucket := a.getBucket(bucketType, bucketIdx) var oldest *knownAddress @@ -460,7 +464,9 @@ func (a *addrBook) pickOldest(bucketType byte, bucketIdx int) *knownAddress { return oldest } -func (a *addrBook) addAddress(addr, src *NetAddress) error { +// adds the address to a "new" bucket. if its already in one, +// it only adds it probabilistically +func (a *addrBook) addAddress(addr, src *types.NetAddress) error { if a.routabilityStrict && !addr.Routable() { return fmt.Errorf("Cannot add non-routable address %v", addr) } @@ -490,7 +496,10 @@ func (a *addrBook) addAddress(addr, src *NetAddress) error { } bucket := a.calcNewBucket(addr, src) - a.addToNewBucket(ka, bucket) + added := a.addToNewBucket(ka, bucket) + if !added { + a.Logger.Info("Can't add new address, addr book is full", "address", addr, "total", a.size()) + } a.Logger.Info("Added new address", "address", addr, "total", a.size()) return nil @@ -513,9 +522,9 @@ func (a *addrBook) expireNew(bucketIdx int) { a.removeFromBucket(oldest, bucketTypeNew, bucketIdx) } -// Promotes an address from new to old. -// TODO: Move to old probabilistically. -// The better a node is, the less likely it should be evicted from an old bucket. +// Promotes an address from new to old. If the destination bucket is full, +// demote the oldest one to a "new" bucket. +// TODO: Demote more probabilistically? func (a *addrBook) moveToOld(ka *knownAddress) { // Sanity check if ka.isOld() { @@ -559,9 +568,12 @@ func (a *addrBook) moveToOld(ka *knownAddress) { } } +//--------------------------------------------------------------------- +// calculate bucket placements + // doublesha256( key + sourcegroup + // int64(doublesha256(key + group + sourcegroup))%bucket_per_group ) % num_new_buckets -func (a *addrBook) calcNewBucket(addr, src *NetAddress) int { +func (a *addrBook) calcNewBucket(addr, src *types.NetAddress) int { data1 := []byte{} data1 = append(data1, []byte(a.key)...) data1 = append(data1, []byte(a.groupKey(addr))...) @@ -582,7 +594,7 @@ func (a *addrBook) calcNewBucket(addr, src *NetAddress) int { // doublesha256( key + group + // int64(doublesha256(key + addr))%buckets_per_group ) % num_old_buckets -func (a *addrBook) calcOldBucket(addr *NetAddress) int { +func (a *addrBook) calcOldBucket(addr *types.NetAddress) int { data1 := []byte{} data1 = append(data1, []byte(a.key)...) data1 = append(data1, []byte(addr.String())...) @@ -604,7 +616,7 @@ func (a *addrBook) calcOldBucket(addr *NetAddress) int { // This is the /16 for IPv4, the /32 (/36 for he.net) for IPv6, the string // "local" for a local address and the string "unroutable" for an unroutable // address. -func (a *addrBook) groupKey(na *NetAddress) string { +func (a *addrBook) groupKey(na *types.NetAddress) string { if a.routabilityStrict && na.Local() { return "local" } @@ -649,137 +661,6 @@ func (a *addrBook) groupKey(na *NetAddress) string { return (&net.IPNet{IP: na.IP, Mask: net.CIDRMask(bits, 128)}).String() } -//----------------------------------------------------------------------------- - -/* - knownAddress - - tracks information about a known network address that is used - to determine how viable an address is. -*/ -type knownAddress struct { - Addr *NetAddress - Src *NetAddress - Attempts int32 - LastAttempt time.Time - LastSuccess time.Time - BucketType byte - Buckets []int -} - -func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { - return &knownAddress{ - Addr: addr, - Src: src, - Attempts: 0, - LastAttempt: time.Now(), - BucketType: bucketTypeNew, - Buckets: nil, - } -} - -func (ka *knownAddress) ID() ID { - return ka.Addr.ID -} - -func (ka *knownAddress) isOld() bool { - return ka.BucketType == bucketTypeOld -} - -func (ka *knownAddress) isNew() bool { - return ka.BucketType == bucketTypeNew -} - -func (ka *knownAddress) markAttempt() { - now := time.Now() - ka.LastAttempt = now - ka.Attempts += 1 -} - -func (ka *knownAddress) markGood() { - now := time.Now() - ka.LastAttempt = now - ka.Attempts = 0 - ka.LastSuccess = now -} - -func (ka *knownAddress) addBucketRef(bucketIdx int) int { - for _, bucket := range ka.Buckets { - if bucket == bucketIdx { - // TODO refactor to return error? - // log.Warn(Fmt("Bucket already exists in ka.Buckets: %v", ka)) - return -1 - } - } - ka.Buckets = append(ka.Buckets, bucketIdx) - return len(ka.Buckets) -} - -func (ka *knownAddress) removeBucketRef(bucketIdx int) int { - buckets := []int{} - for _, bucket := range ka.Buckets { - if bucket != bucketIdx { - buckets = append(buckets, bucket) - } - } - if len(buckets) != len(ka.Buckets)-1 { - // TODO refactor to return error? - // log.Warn(Fmt("bucketIdx not found in ka.Buckets: %v", ka)) - return -1 - } - ka.Buckets = buckets - return len(ka.Buckets) -} - -/* - An address is bad if the address in question is a New address, has not been tried in the last - minute, and meets one of the following criteria: - - 1) It claims to be from the future - 2) It hasn't been seen in over a month - 3) It has failed at least three times and never succeeded - 4) It has failed ten times in the last week - - All addresses that meet these criteria are assumed to be worthless and not - worth keeping hold of. - - XXX: so a good peer needs us to call MarkGood before the conditions above are reached! -*/ -func (ka *knownAddress) isBad() bool { - // Is Old --> good - if ka.BucketType == bucketTypeOld { - return false - } - - // Has been attempted in the last minute --> good - if ka.LastAttempt.Before(time.Now().Add(-1 * time.Minute)) { - return false - } - - // Too old? - // XXX: does this mean if we've kept a connection up for this long we'll disconnect?! - // and shouldn't it be .Before ? - if ka.LastAttempt.After(time.Now().Add(-1 * numMissingDays * time.Hour * 24)) { - return true - } - - // Never succeeded? - if ka.LastSuccess.IsZero() && ka.Attempts >= numRetries { - return true - } - - // Hasn't succeeded in too long? - // XXX: does this mean if we've kept a connection up for this long we'll disconnect?! - if ka.LastSuccess.Before(time.Now().Add(-1*minBadDays*time.Hour*24)) && - ka.Attempts >= maxFailures { - return true - } - - return false -} - -//----------------------------------------------------------------------------- - // doubleSha256 calculates sha256(sha256(b)) and returns the resulting bytes. func doubleSha256(b []byte) []byte { hasher := sha256.New() diff --git a/p2p/addrbook/addrbook_test.go b/p2p/pex/addrbook_test.go similarity index 93% rename from p2p/addrbook/addrbook_test.go rename to p2p/pex/addrbook_test.go index ff8d239d..206e3401 100644 --- a/p2p/addrbook/addrbook_test.go +++ b/p2p/pex/addrbook_test.go @@ -1,4 +1,4 @@ -package addrbook +package pex import ( "encoding/hex" @@ -8,6 +8,7 @@ import ( "testing" "github.com/stretchr/testify/assert" + "github.com/tendermint/tendermint/p2p/types" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" ) @@ -168,8 +169,8 @@ func TestAddrBookHandlesDuplicates(t *testing.T) { } type netAddressPair struct { - addr *NetAddress - src *NetAddress + addr *types.NetAddress + src *types.NetAddress } func randNetAddressPairs(t *testing.T, n int) []netAddressPair { @@ -180,7 +181,7 @@ func randNetAddressPairs(t *testing.T, n int) []netAddressPair { return randAddrs } -func randIPv4Address(t *testing.T) *NetAddress { +func randIPv4Address(t *testing.T) *types.NetAddress { for { ip := fmt.Sprintf("%v.%v.%v.%v", rand.Intn(254)+1, @@ -189,9 +190,9 @@ func randIPv4Address(t *testing.T) *NetAddress { rand.Intn(255), ) port := rand.Intn(65535-1) + 1 - id := ID(hex.EncodeToString(cmn.RandBytes(IDByteLength))) - idAddr := IDAddressString(id, fmt.Sprintf("%v:%v", ip, port)) - addr, err := NewNetAddressString(idAddr) + id := types.ID(hex.EncodeToString(cmn.RandBytes(types.IDByteLength))) + idAddr := types.IDAddressString(id, fmt.Sprintf("%v:%v", ip, port)) + addr, err := types.NewNetAddressString(idAddr) assert.Nil(t, err, "error generating rand network address") if addr.Routable() { return addr diff --git a/p2p/addrbook/file.go b/p2p/pex/file.go similarity index 99% rename from p2p/addrbook/file.go rename to p2p/pex/file.go index 956ac56c..521fcfcf 100644 --- a/p2p/addrbook/file.go +++ b/p2p/pex/file.go @@ -1,4 +1,4 @@ -package addrbook +package pex import ( "encoding/json" diff --git a/p2p/addrbook/known_address.go b/p2p/pex/known_address.go similarity index 92% rename from p2p/addrbook/known_address.go rename to p2p/pex/known_address.go index 2a879081..db6d021f 100644 --- a/p2p/addrbook/known_address.go +++ b/p2p/pex/known_address.go @@ -1,12 +1,16 @@ -package addrbook +package pex -import "time" +import ( + "time" + + "github.com/tendermint/tendermint/p2p/types" +) // knownAddress tracks information about a known network address // that is used to determine how viable an address is. type knownAddress struct { - Addr *NetAddress - Src *NetAddress + Addr *types.NetAddress + Src *types.NetAddress Attempts int32 LastAttempt time.Time LastSuccess time.Time @@ -14,7 +18,7 @@ type knownAddress struct { Buckets []int } -func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { +func newKnownAddress(addr *types.NetAddress, src *types.NetAddress) *knownAddress { return &knownAddress{ Addr: addr, Src: src, @@ -25,7 +29,7 @@ func newKnownAddress(addr *NetAddress, src *NetAddress) *knownAddress { } } -func (ka *knownAddress) ID() ID { +func (ka *knownAddress) ID() types.ID { return ka.Addr.ID } diff --git a/p2p/addrbook/params.go b/p2p/pex/params.go similarity index 98% rename from p2p/addrbook/params.go rename to p2p/pex/params.go index f410ed9a..f94e1021 100644 --- a/p2p/addrbook/params.go +++ b/p2p/pex/params.go @@ -1,4 +1,4 @@ -package addrbook +package pex import "time" diff --git a/p2p/pex_reactor.go b/p2p/pex/pex_reactor.go similarity index 92% rename from p2p/pex_reactor.go rename to p2p/pex/pex_reactor.go index 57665e07..24c9417f 100644 --- a/p2p/pex_reactor.go +++ b/p2p/pex/pex_reactor.go @@ -1,4 +1,4 @@ -package p2p +package pex import ( "bytes" @@ -12,9 +12,13 @@ import ( wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" - "github.com/tendermint/tendermint/p2p/addrbook" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/tmconn" + "github.com/tendermint/tendermint/p2p/types" ) +type Peer = p2p.Peer + const ( // PexChannel is a channel for PEX messages PexChannel = byte(0x00) @@ -47,9 +51,9 @@ const ( // Only accept pexAddrsMsg from peers we sent a corresponding pexRequestMsg too. // Only accept one pexRequestMsg every ~defaultEnsurePeersPeriod. type PEXReactor struct { - BaseReactor + p2p.BaseReactor - book *addrbook.AddrBook + book AddrBook config *PEXReactorConfig ensurePeersPeriod time.Duration @@ -69,7 +73,7 @@ type PEXReactorConfig struct { } // NewPEXReactor creates new PEX reactor. -func NewPEXReactor(b *addrbook.AddrBook, config *PEXReactorConfig) *PEXReactor { +func NewPEXReactor(b AddrBook, config *PEXReactorConfig) *PEXReactor { r := &PEXReactor{ book: b, config: config, @@ -77,7 +81,7 @@ func NewPEXReactor(b *addrbook.AddrBook, config *PEXReactorConfig) *PEXReactor { requestsSent: cmn.NewCMap(), lastReceivedRequests: cmn.NewCMap(), } - r.BaseReactor = *NewBaseReactor("PEXReactor", r) + r.BaseReactor = *p2p.NewBaseReactor("PEXReactor", r) return r } @@ -113,8 +117,8 @@ func (r *PEXReactor) OnStop() { } // GetChannels implements Reactor -func (r *PEXReactor) GetChannels() []*ChannelDescriptor { - return []*ChannelDescriptor{ +func (r *PEXReactor) GetChannels() []*tmconn.ChannelDescriptor { + return []*tmconn.ChannelDescriptor{ { ID: PexChannel, Priority: 1, @@ -227,7 +231,7 @@ func (r *PEXReactor) RequestAddrs(p Peer) { // ReceiveAddrs adds the given addrs to the addrbook if theres an open // request for this peer and deletes the open request. // If there's no open request for the src peer, it returns an error. -func (r *PEXReactor) ReceiveAddrs(addrs []*NetAddress, src Peer) error { +func (r *PEXReactor) ReceiveAddrs(addrs []*types.NetAddress, src Peer) error { id := string(src.ID()) if !r.requestsSent.Has(id) { @@ -246,7 +250,7 @@ func (r *PEXReactor) ReceiveAddrs(addrs []*NetAddress, src Peer) error { } // SendAddrs sends addrs to the peer. -func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*NetAddress) { +func (r *PEXReactor) SendAddrs(p Peer, netAddrs []*types.NetAddress) { p.Send(PexChannel, struct{ PexMessage }{&pexAddrsMessage{Addrs: netAddrs}}) } @@ -296,7 +300,7 @@ func (r *PEXReactor) ensurePeers() { // NOTE: range here is [10, 90]. Too high ? newBias := cmn.MinInt(numOutPeers, 8)*10 + 10 - toDial := make(map[ID]*NetAddress) + toDial := make(map[types.ID]*types.NetAddress) // Try maxAttempts times to pick numToDial addresses to dial maxAttempts := numToDial * 3 for i := 0; i < maxAttempts && len(toDial) < numToDial; i++ { @@ -319,10 +323,15 @@ func (r *PEXReactor) ensurePeers() { // Dial picked addresses for _, item := range toDial { - go func(picked *NetAddress) { + go func(picked *types.NetAddress) { _, err := r.Switch.DialPeerWithAddress(picked, false) if err != nil { - r.book.MarkAttempt(picked) + // TODO: detect more "bad peer" scenarios + if _, ok := err.(types.ErrSwitchAuthenticationFailure); ok { + r.book.MarkBad(picked) + } else { + r.book.MarkAttempt(picked) + } } }(item) } @@ -351,7 +360,7 @@ func (r *PEXReactor) checkSeeds() error { if lSeeds == 0 { return nil } - _, errs := NewNetAddressStrings(r.config.Seeds) + _, errs := types.NewNetAddressStrings(r.config.Seeds) for _, err := range errs { if err != nil { return err @@ -366,9 +375,10 @@ func (r *PEXReactor) dialSeeds() { if lSeeds == 0 { return } - seedAddrs, _ := NewNetAddressStrings(r.config.Seeds) + seedAddrs, _ := types.NewNetAddressStrings(r.config.Seeds) - perm := r.Switch.rng.Perm(lSeeds) + perm := rand.Perm(lSeeds) + // perm := r.Switch.rng.Perm(lSeeds) for _, i := range perm { // dial a random seed seedAddr := seedAddrs[i] @@ -410,7 +420,7 @@ func (r *PEXReactor) crawlPeersRoutine() { // network crawling performed during seed/crawler mode. type crawlPeerInfo struct { // The listening address of a potential peer we learned about - Addr *NetAddress + Addr *types.NetAddress // The last time we attempt to reach this address LastAttempt time.Time @@ -534,7 +544,7 @@ func (m *pexRequestMessage) String() string { A message with announced peer addresses. */ type pexAddrsMessage struct { - Addrs []*NetAddress + Addrs []*types.NetAddress } func (m *pexAddrsMessage) String() string { diff --git a/p2p/pex_reactor_test.go b/p2p/pex/pex_reactor_test.go similarity index 76% rename from p2p/pex_reactor_test.go rename to p2p/pex/pex_reactor_test.go index 44fd8b51..439914ac 100644 --- a/p2p/pex_reactor_test.go +++ b/p2p/pex/pex_reactor_test.go @@ -1,21 +1,35 @@ -package p2p +package pex import ( "fmt" "io/ioutil" - "math/rand" "os" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + crypto "github.com/tendermint/go-crypto" wire "github.com/tendermint/go-wire" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p" + "github.com/tendermint/tendermint/p2p/tmconn" + "github.com/tendermint/tendermint/p2p/types" ) +var ( + config *cfg.P2PConfig +) + +func init() { + config = cfg.DefaultP2PConfig() + config.PexReactor = true +} + func TestPEXReactorBasic(t *testing.T) { assert, require := assert.New(t), require.New(t) @@ -45,7 +59,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) { r.SetLogger(log.TestingLogger()) size := book.Size() - peer := createRandomPeer(false) + peer := p2p.CreateRandomPeer(false) r.AddPeer(peer) assert.Equal(size+1, book.Size()) @@ -53,7 +67,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) { r.RemovePeer(peer, "peer not available") assert.Equal(size+1, book.Size()) - outboundPeer := createRandomPeer(true) + outboundPeer := p2p.CreateRandomPeer(true) r.AddPeer(outboundPeer) assert.Equal(size+1, book.Size(), "outbound peers should not be added to the address book") @@ -64,7 +78,7 @@ func TestPEXReactorAddRemovePeer(t *testing.T) { func TestPEXReactorRunning(t *testing.T) { N := 3 - switches := make([]*Switch, N) + switches := make([]*p2p.Switch, N) dir, err := ioutil.TempDir("", "pex_reactor") require.Nil(t, err) @@ -74,7 +88,7 @@ func TestPEXReactorRunning(t *testing.T) { // create switches for i := 0; i < N; i++ { - switches[i] = makeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + switches[i] = p2p.MakeSwitch(config, i, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { sw.SetLogger(log.TestingLogger().With("switch", i)) r := NewPEXReactor(book, &PEXReactorConfig{}) @@ -87,9 +101,9 @@ func TestPEXReactorRunning(t *testing.T) { // fill the address book and add listeners for _, s := range switches { - addr, _ := NewNetAddressString(s.NodeInfo().ListenAddr) + addr, _ := types.NewNetAddressString(s.NodeInfo().ListenAddr) book.AddAddress(addr, addr) - s.AddListener(NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger())) + s.AddListener(p2p.NewDefaultListener("tcp", s.NodeInfo().ListenAddr, true, log.TestingLogger())) } // start switches @@ -106,7 +120,7 @@ func TestPEXReactorRunning(t *testing.T) { } } -func assertSomePeersWithTimeout(t *testing.T, switches []*Switch, checkPeriod, timeout time.Duration) { +func assertSomePeersWithTimeout(t *testing.T, switches []*p2p.Switch, checkPeriod, timeout time.Duration) { ticker := time.NewTicker(checkPeriod) remaining := timeout for { @@ -151,13 +165,13 @@ func TestPEXReactorReceive(t *testing.T) { r := NewPEXReactor(book, &PEXReactorConfig{}) r.SetLogger(log.TestingLogger()) - peer := createRandomPeer(false) + peer := p2p.CreateRandomPeer(false) // we have to send a request to receive responses r.RequestAddrs(peer) size := book.Size() - addrs := []*NetAddress{peer.NodeInfo().NetAddress()} + addrs := []*types.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) r.Receive(PexChannel, peer, msg) assert.Equal(size+1, book.Size()) @@ -176,14 +190,14 @@ func TestPEXReactorRequestMessageAbuse(t *testing.T) { book.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw }) + sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) sw.SetLogger(log.TestingLogger()) sw.AddReactor("PEX", r) r.SetSwitch(sw) r.SetLogger(log.TestingLogger()) peer := newMockPeer() - sw.peers.Add(peer) + p2p.AddPeerToSwitch(sw, peer) assert.True(sw.Peers().Has(peer.ID())) id := string(peer.ID()) @@ -215,14 +229,14 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { book.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{}) - sw := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { return sw }) + sw := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { return sw }) sw.SetLogger(log.TestingLogger()) sw.AddReactor("PEX", r) r.SetSwitch(sw) r.SetLogger(log.TestingLogger()) peer := newMockPeer() - sw.peers.Add(peer) + p2p.AddPeerToSwitch(sw, peer) assert.True(sw.Peers().Has(peer.ID())) id := string(peer.ID()) @@ -232,7 +246,7 @@ func TestPEXReactorAddrsMessageAbuse(t *testing.T) { assert.True(r.requestsSent.Has(id)) assert.True(sw.Peers().Has(peer.ID())) - addrs := []*NetAddress{peer.NodeInfo().NetAddress()} + addrs := []*types.NetAddress{peer.NodeInfo().NetAddress()} msg := wire.BinaryBytes(struct{ PexMessage }{&pexAddrsMessage{Addrs: addrs}}) // receive some addrs. should clear the request @@ -254,7 +268,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { book.SetLogger(log.TestingLogger()) // 1. create seed - seed := makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + seed := p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { sw.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{}) @@ -263,13 +277,13 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { sw.AddReactor("pex", r) return sw }) - seed.AddListener(NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger())) + seed.AddListener(p2p.NewDefaultListener("tcp", seed.NodeInfo().ListenAddr, true, log.TestingLogger())) err = seed.Start() require.Nil(t, err) defer seed.Stop() // 2. create usual peer - sw := makeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + sw := p2p.MakeSwitch(config, 1, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { sw.SetLogger(log.TestingLogger()) r := NewPEXReactor(book, &PEXReactorConfig{Seeds: []string{seed.NodeInfo().ListenAddr}}) @@ -283,7 +297,7 @@ func TestPEXReactorUsesSeedsIfNeeded(t *testing.T) { defer sw.Stop() // 3. check that peer at least connects to seed - assertSomePeersWithTimeout(t, []*Switch{sw}, 10*time.Millisecond, 10*time.Second) + assertSomePeersWithTimeout(t, []*p2p.Switch{sw}, 10*time.Millisecond, 10*time.Second) } func TestPEXReactorCrawlStatus(t *testing.T) { @@ -297,7 +311,7 @@ func TestPEXReactorCrawlStatus(t *testing.T) { pexR := NewPEXReactor(book, &PEXReactorConfig{SeedMode: true}) // Seed/Crawler mode uses data from the Switch - makeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *Switch) *Switch { + p2p.MakeSwitch(config, 0, "127.0.0.1", "123.123.123", func(i int, sw *p2p.Switch) *p2p.Switch { pexR.SetLogger(log.TestingLogger()) sw.SetLogger(log.TestingLogger().With("switch", i)) sw.AddReactor("pex", pexR) @@ -305,13 +319,13 @@ func TestPEXReactorCrawlStatus(t *testing.T) { }) // Create a peer, add it to the peer set and the addrbook. - peer := createRandomPeer(false) - pexR.Switch.peers.Add(peer) + peer := p2p.CreateRandomPeer(false) + p2p.AddPeerToSwitch(pexR.Switch, peer) addr1 := peer.NodeInfo().NetAddress() pexR.book.AddAddress(addr1, addr1) // Add a non-connected address to the book. - _, addr2 := createRoutableAddr() + _, addr2 := p2p.CreateRoutableAddr() pexR.book.AddAddress(addr2, addr1) // Get some peerInfos to crawl @@ -323,44 +337,15 @@ func TestPEXReactorCrawlStatus(t *testing.T) { // TODO: test } -func createRoutableAddr() (addr string, netAddr *NetAddress) { - for { - var err error - addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) - netAddr, err = NewNetAddressString(addr) - if err != nil { - panic(err) - } - if netAddr.Routable() { - break - } - } - return -} - -func createRandomPeer(outbound bool) *peer { - addr, netAddr := createRoutableAddr() - p := &peer{ - nodeInfo: NodeInfo{ - ListenAddr: netAddr.DialString(), - PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), - }, - outbound: outbound, - mconn: &MConnection{}, - } - p.SetLogger(log.TestingLogger().With("peer", addr)) - return p -} - type mockPeer struct { *cmn.BaseService pubKey crypto.PubKey - addr *NetAddress + addr *types.NetAddress outbound, persistent bool } func newMockPeer() mockPeer { - _, netAddr := createRoutableAddr() + _, netAddr := p2p.CreateRoutableAddr() mp := mockPeer{ addr: netAddr, pubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), @@ -370,17 +355,17 @@ func newMockPeer() mockPeer { return mp } -func (mp mockPeer) ID() ID { return PubKeyToID(mp.pubKey) } +func (mp mockPeer) ID() types.ID { return types.PubKeyToID(mp.pubKey) } func (mp mockPeer) IsOutbound() bool { return mp.outbound } func (mp mockPeer) IsPersistent() bool { return mp.persistent } -func (mp mockPeer) NodeInfo() NodeInfo { - return NodeInfo{ +func (mp mockPeer) NodeInfo() types.NodeInfo { + return types.NodeInfo{ PubKey: mp.pubKey, ListenAddr: mp.addr.DialString(), } } -func (mp mockPeer) Status() ConnectionStatus { return ConnectionStatus{} } -func (mp mockPeer) Send(byte, interface{}) bool { return false } -func (mp mockPeer) TrySend(byte, interface{}) bool { return false } -func (mp mockPeer) Set(string, interface{}) {} -func (mp mockPeer) Get(string) interface{} { return nil } +func (mp mockPeer) Status() tmconn.ConnectionStatus { return tmconn.ConnectionStatus{} } +func (mp mockPeer) Send(byte, interface{}) bool { return false } +func (mp mockPeer) TrySend(byte, interface{}) bool { return false } +func (mp mockPeer) Set(string, interface{}) {} +func (mp mockPeer) Get(string) interface{} { return nil } diff --git a/p2p/switch.go b/p2p/switch.go index db2e7d98..ec54478c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -11,6 +11,8 @@ import ( crypto "github.com/tendermint/go-crypto" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p/tmconn" + "github.com/tendermint/tendermint/p2p/types" cmn "github.com/tendermint/tmlibs/common" ) @@ -30,10 +32,11 @@ const ( reconnectBackOffBaseSeconds = 3 ) -var ( - ErrSwitchDuplicatePeer = errors.New("Duplicate peer") - ErrSwitchConnectToSelf = errors.New("Connect to self") -) +//----------------------------------------------------------------------------- + +type AddrBook interface { + AddAddress(addr *types.NetAddress, src *types.NetAddress) +} //----------------------------------------------------------------------------- @@ -48,12 +51,12 @@ type Switch struct { peerConfig *PeerConfig listeners []Listener reactors map[string]Reactor - chDescs []*ChannelDescriptor + chDescs []*tmconn.ChannelDescriptor reactorsByCh map[byte]Reactor peers *PeerSet dialing *cmn.CMap - nodeInfo NodeInfo // our node info - nodeKey *NodeKey // our node privkey + nodeInfo types.NodeInfo // our node info + nodeKey *types.NodeKey // our node privkey filterConnByAddr func(net.Addr) error filterConnByPubKey func(crypto.PubKey) error @@ -66,7 +69,7 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { config: config, peerConfig: DefaultPeerConfig(), reactors: make(map[string]Reactor), - chDescs: make([]*ChannelDescriptor, 0), + chDescs: make([]*tmconn.ChannelDescriptor, 0), reactorsByCh: make(map[byte]Reactor), peers: NewPeerSet(), dialing: cmn.NewCMap(), @@ -77,10 +80,10 @@ func NewSwitch(config *cfg.P2PConfig) *Switch { sw.rng = rand.New(rand.NewSource(cmn.RandInt64())) // TODO: collapse the peerConfig into the config ? - sw.peerConfig.MConfig.flushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond + sw.peerConfig.MConfig.FlushThrottle = time.Duration(config.FlushThrottleTimeout) * time.Millisecond sw.peerConfig.MConfig.SendRate = config.SendRate sw.peerConfig.MConfig.RecvRate = config.RecvRate - sw.peerConfig.MConfig.maxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize + sw.peerConfig.MConfig.MaxMsgPacketPayloadSize = config.MaxMsgPacketPayloadSize sw.BaseService = *cmn.NewBaseService(nil, "P2P Switch", sw) return sw @@ -140,19 +143,19 @@ func (sw *Switch) IsListening() bool { // SetNodeInfo sets the switch's NodeInfo for checking compatibility and handshaking with other nodes. // NOTE: Not goroutine safe. -func (sw *Switch) SetNodeInfo(nodeInfo NodeInfo) { +func (sw *Switch) SetNodeInfo(nodeInfo types.NodeInfo) { sw.nodeInfo = nodeInfo } // NodeInfo returns the switch's NodeInfo. // NOTE: Not goroutine safe. -func (sw *Switch) NodeInfo() NodeInfo { +func (sw *Switch) NodeInfo() types.NodeInfo { return sw.nodeInfo } // SetNodeKey sets the switch's private key for authenticated encryption. // NOTE: Not goroutine safe. -func (sw *Switch) SetNodeKey(nodeKey *NodeKey) { +func (sw *Switch) SetNodeKey(nodeKey *types.NodeKey) { sw.nodeKey = nodeKey } @@ -311,13 +314,13 @@ func (sw *Switch) reconnectToPeer(peer Peer) { // Dialing // IsDialing returns true if the switch is currently dialing the given ID. -func (sw *Switch) IsDialing(id ID) bool { +func (sw *Switch) IsDialing(id types.ID) bool { return sw.dialing.Has(string(id)) } // DialPeersAsync dials a list of peers asynchronously in random order (optionally, making them persistent). -func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent bool) error { - netAddrs, errs := NewNetAddressStrings(peers) +func (sw *Switch) DialPeersAsync(addrBook AddrBook, peers []string, persistent bool) error { + netAddrs, errs := types.NewNetAddressStrings(peers) for _, err := range errs { sw.Logger.Error("Error in peer's address", "err", err) } @@ -330,6 +333,7 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent if netAddr.Same(ourAddr) { continue } + // TODO: move this out of here ? addrBook.AddAddress(netAddr, ourAddr) } } @@ -353,7 +357,7 @@ func (sw *Switch) DialPeersAsync(addrBook *AddrBook, peers []string, persistent // DialPeerWithAddress dials the given peer and runs sw.addPeer if it connects and authenticates successfully. // If `persistent == true`, the switch will always try to reconnect to this peer if the connection ever fails. -func (sw *Switch) DialPeerWithAddress(addr *NetAddress, persistent bool) (Peer, error) { +func (sw *Switch) DialPeerWithAddress(addr *types.NetAddress, persistent bool) (Peer, error) { sw.dialing.Set(string(addr.ID), addr) defer sw.dialing.Delete(string(addr.ID)) return sw.addOutboundPeerWithConfig(addr, sw.peerConfig, persistent) @@ -439,7 +443,7 @@ func (sw *Switch) addInboundPeerWithConfig(conn net.Conn, config *PeerConfig) er // dial the peer; make secret connection; authenticate against the dialed ID; // add the peer. -func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig, persistent bool) (Peer, error) { +func (sw *Switch) addOutboundPeerWithConfig(addr *types.NetAddress, config *PeerConfig, persistent bool) (Peer, error) { sw.Logger.Info("Dialing peer", "address", addr) peer, err := newOutboundPeer(addr, sw.reactorsByCh, sw.chDescs, sw.StopPeerForError, sw.nodeKey.PrivKey, config, persistent) if err != nil { @@ -453,7 +457,7 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig peer.Logger.Info("Dialed peer with unknown ID - unable to authenticate", "addr", addr) } else if addr.ID != peer.ID() { peer.CloseConn() - return nil, fmt.Errorf("Failed to authenticate peer %v. Connected to peer with ID %s", addr, peer.ID()) + return nil, types.ErrSwitchAuthenticationFailure{addr, peer.ID()} } err = sw.addPeer(peer) @@ -474,12 +478,12 @@ func (sw *Switch) addOutboundPeerWithConfig(addr *NetAddress, config *PeerConfig func (sw *Switch) addPeer(peer *peer) error { // Avoid self if sw.nodeKey.ID() == peer.ID() { - return ErrSwitchConnectToSelf + return types.ErrSwitchConnectToSelf } // Avoid duplicate if sw.peers.Has(peer.ID()) { - return ErrSwitchDuplicatePeer + return types.ErrSwitchDuplicatePeer } diff --git a/p2p/switch_test.go b/p2p/switch_test.go index a729698e..ae7e89e7 100644 --- a/p2p/switch_test.go +++ b/p2p/switch_test.go @@ -16,6 +16,8 @@ import ( "github.com/tendermint/tmlibs/log" cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p/tmconn" + "github.com/tendermint/tendermint/p2p/types" ) var ( @@ -28,7 +30,7 @@ func init() { } type PeerMessage struct { - PeerID ID + PeerID types.ID Bytes []byte Counter int } @@ -37,7 +39,7 @@ type TestReactor struct { BaseReactor mtx sync.Mutex - channels []*ChannelDescriptor + channels []*tmconn.ChannelDescriptor peersAdded []Peer peersRemoved []Peer logMessages bool @@ -45,7 +47,7 @@ type TestReactor struct { msgsReceived map[byte][]PeerMessage } -func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReactor { +func NewTestReactor(channels []*tmconn.ChannelDescriptor, logMessages bool) *TestReactor { tr := &TestReactor{ channels: channels, logMessages: logMessages, @@ -56,7 +58,7 @@ func NewTestReactor(channels []*ChannelDescriptor, logMessages bool) *TestReacto return tr } -func (tr *TestReactor) GetChannels() []*ChannelDescriptor { +func (tr *TestReactor) GetChannels() []*tmconn.ChannelDescriptor { return tr.channels } @@ -92,7 +94,7 @@ func (tr *TestReactor) getMsgs(chID byte) []PeerMessage { // convenience method for creating two switches connected to each other. // XXX: note this uses net.Pipe and not a proper TCP conn -func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) { +func MakeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switch, *Switch) { // Create two switches that will be interconnected. switches := MakeConnectedSwitches(config, 2, initSwitch, Connect2Switches) return switches[0], switches[1] @@ -100,11 +102,11 @@ func makeSwitchPair(t testing.TB, initSwitch func(int, *Switch) *Switch) (*Switc func initSwitchFunc(i int, sw *Switch) *Switch { // Make two reactors of two channels each - sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("foo", NewTestReactor([]*tmconn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, {ID: byte(0x01), Priority: 10}, }, true)) - sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("bar", NewTestReactor([]*tmconn.ChannelDescriptor{ {ID: byte(0x02), Priority: 10}, {ID: byte(0x03), Priority: 10}, }, true)) @@ -112,7 +114,7 @@ func initSwitchFunc(i int, sw *Switch) *Switch { } func TestSwitches(t *testing.T) { - s1, s2 := makeSwitchPair(t, initSwitchFunc) + s1, s2 := MakeSwitchPair(t, initSwitchFunc) defer s1.Stop() defer s2.Stop() @@ -156,12 +158,12 @@ func assertMsgReceivedWithTimeout(t *testing.T, msg string, channel byte, reacto } func TestConnAddrFilter(t *testing.T) { - s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() - c1, c2 := netPipe() + c1, c2 := tmconn.NetPipe() s1.SetAddrFilter(func(addr net.Addr) error { if addr.String() == c1.RemoteAddr().String() { @@ -192,12 +194,12 @@ func assertNoPeersAfterTimeout(t *testing.T, sw *Switch, timeout time.Duration) } func TestConnPubKeyFilter(t *testing.T) { - s1 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) - s2 := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s1 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + s2 := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) defer s1.Stop() defer s2.Stop() - c1, c2 := netPipe() + c1, c2 := tmconn.NetPipe() // set pubkey filter s1.SetPubKeyFilter(func(pubkey crypto.PubKey) error { @@ -224,7 +226,7 @@ func TestConnPubKeyFilter(t *testing.T) { func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -251,7 +253,7 @@ func TestSwitchStopsNonPersistentPeerOnError(t *testing.T) { func TestSwitchReconnectsToPersistentPeer(t *testing.T) { assert, require := assert.New(t), require.New(t) - sw := makeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) + sw := MakeSwitch(config, 1, "testing", "123.123.123", initSwitchFunc) err := sw.Start() if err != nil { t.Error(err) @@ -302,13 +304,13 @@ func TestSwitchFullConnectivity(t *testing.T) { func BenchmarkSwitches(b *testing.B) { b.StopTimer() - s1, s2 := makeSwitchPair(b, func(i int, sw *Switch) *Switch { + s1, s2 := MakeSwitchPair(b, func(i int, sw *Switch) *Switch { // Make bar reactors of bar channels each - sw.AddReactor("foo", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("foo", NewTestReactor([]*tmconn.ChannelDescriptor{ {ID: byte(0x00), Priority: 10}, {ID: byte(0x01), Priority: 10}, }, false)) - sw.AddReactor("bar", NewTestReactor([]*ChannelDescriptor{ + sw.AddReactor("bar", NewTestReactor([]*tmconn.ChannelDescriptor{ {ID: byte(0x02), Priority: 10}, {ID: byte(0x03), Priority: 10}, }, false)) diff --git a/p2p/test_util.go b/p2p/test_util.go index dca23a0e..aad6fb23 100644 --- a/p2p/test_util.go +++ b/p2p/test_util.go @@ -5,11 +5,47 @@ import ( "net" crypto "github.com/tendermint/go-crypto" - cfg "github.com/tendermint/tendermint/config" cmn "github.com/tendermint/tmlibs/common" "github.com/tendermint/tmlibs/log" + + cfg "github.com/tendermint/tendermint/config" + "github.com/tendermint/tendermint/p2p/tmconn" + "github.com/tendermint/tendermint/p2p/types" ) +func AddPeerToSwitch(sw *Switch, peer Peer) { + sw.peers.Add(peer) +} + +func CreateRandomPeer(outbound bool) *peer { + addr, netAddr := CreateRoutableAddr() + p := &peer{ + nodeInfo: types.NodeInfo{ + ListenAddr: netAddr.DialString(), + PubKey: crypto.GenPrivKeyEd25519().Wrap().PubKey(), + }, + outbound: outbound, + mconn: &tmconn.MConnection{}, + } + p.SetLogger(log.TestingLogger().With("peer", addr)) + return p +} + +func CreateRoutableAddr() (addr string, netAddr *types.NetAddress) { + for { + var err error + addr = cmn.Fmt("%X@%v.%v.%v.%v:46656", cmn.RandBytes(20), rand.Int()%256, rand.Int()%256, rand.Int()%256, rand.Int()%256) + netAddr, err = types.NewNetAddressString(addr) + if err != nil { + panic(err) + } + if netAddr.Routable() { + break + } + } + return +} + //------------------------------------------------------------------ // Connects switches via arbitrary net.Conn. Used for testing. @@ -20,7 +56,7 @@ import ( func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Switch) *Switch, connect func([]*Switch, int, int)) []*Switch { switches := make([]*Switch, n) for i := 0; i < n; i++ { - switches[i] = makeSwitch(cfg, i, "testing", "123.123.123", initSwitch) + switches[i] = MakeSwitch(cfg, i, "testing", "123.123.123", initSwitch) } if err := StartSwitches(switches); err != nil { @@ -42,7 +78,7 @@ func MakeConnectedSwitches(cfg *cfg.P2PConfig, n int, initSwitch func(int, *Swit func Connect2Switches(switches []*Switch, i, j int) { switchI := switches[i] switchJ := switches[j] - c1, c2 := netPipe() + c1, c2 := tmconn.NetPipe() doneCh := make(chan struct{}) go func() { err := switchI.addPeerWithConnection(c1) @@ -91,16 +127,16 @@ func StartSwitches(switches []*Switch) error { return nil } -func makeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { +func MakeSwitch(cfg *cfg.P2PConfig, i int, network, version string, initSwitch func(int, *Switch) *Switch) *Switch { // new switch, add reactors // TODO: let the config be passed in? - nodeKey := &NodeKey{ + nodeKey := &types.NodeKey{ PrivKey: crypto.GenPrivKeyEd25519().Wrap(), } s := NewSwitch(cfg) s.SetLogger(log.TestingLogger()) s = initSwitch(i, s) - s.SetNodeInfo(NodeInfo{ + s.SetNodeInfo(types.NodeInfo{ PubKey: nodeKey.PubKey(), Moniker: cmn.Fmt("switch%d", i), Network: network, diff --git a/p2p/conn_go110.go b/p2p/tmconn/conn_go110.go similarity index 86% rename from p2p/conn_go110.go rename to p2p/tmconn/conn_go110.go index 2fca7c3d..75e55d85 100644 --- a/p2p/conn_go110.go +++ b/p2p/tmconn/conn_go110.go @@ -1,6 +1,6 @@ // +build go1.10 -package p2p +package tmconn // Go1.10 has a proper net.Conn implementation that // has the SetDeadline method implemented as per @@ -10,6 +10,6 @@ package p2p import "net" -func netPipe() (net.Conn, net.Conn) { +func NetPipe() (net.Conn, net.Conn) { return net.Pipe() } diff --git a/p2p/conn_notgo110.go b/p2p/tmconn/conn_notgo110.go similarity index 93% rename from p2p/conn_notgo110.go rename to p2p/tmconn/conn_notgo110.go index a5c2f741..bb72d64a 100644 --- a/p2p/conn_notgo110.go +++ b/p2p/tmconn/conn_notgo110.go @@ -1,6 +1,6 @@ // +build !go1.10 -package p2p +package tmconn import ( "net" @@ -24,7 +24,7 @@ func (p *pipe) SetDeadline(t time.Time) error { return nil } -func netPipe() (net.Conn, net.Conn) { +func NetPipe() (net.Conn, net.Conn) { p1, p2 := net.Pipe() return &pipe{p1}, &pipe{p2} } diff --git a/p2p/connection.go b/p2p/tmconn/connection.go similarity index 98% rename from p2p/connection.go rename to p2p/tmconn/connection.go index dcb66096..92c48c36 100644 --- a/p2p/connection.go +++ b/p2p/tmconn/connection.go @@ -1,4 +1,4 @@ -package p2p +package tmconn import ( "bufio" @@ -97,13 +97,13 @@ type MConnConfig struct { SendRate int64 `mapstructure:"send_rate"` RecvRate int64 `mapstructure:"recv_rate"` - maxMsgPacketPayloadSize int + MaxMsgPacketPayloadSize int - flushThrottle time.Duration + FlushThrottle time.Duration } func (cfg *MConnConfig) maxMsgPacketTotalSize() int { - return cfg.maxMsgPacketPayloadSize + maxMsgPacketOverheadSize + return cfg.MaxMsgPacketPayloadSize + maxMsgPacketOverheadSize } // DefaultMConnConfig returns the default config. @@ -111,8 +111,8 @@ func DefaultMConnConfig() *MConnConfig { return &MConnConfig{ SendRate: defaultSendRate, RecvRate: defaultRecvRate, - maxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, - flushThrottle: defaultFlushThrottle, + MaxMsgPacketPayloadSize: defaultMaxMsgPacketPayloadSize, + FlushThrottle: defaultFlushThrottle, } } @@ -171,7 +171,7 @@ func (c *MConnection) OnStart() error { return err } c.quit = make(chan struct{}) - c.flushTimer = cmn.NewThrottleTimer("flush", c.config.flushThrottle) + c.flushTimer = cmn.NewThrottleTimer("flush", c.config.FlushThrottle) c.pingTimer = cmn.NewRepeatTimer("ping", pingTimeout) c.chStatsTimer = cmn.NewRepeatTimer("chStats", updateStats) go c.sendRoutine() @@ -586,7 +586,7 @@ func newChannel(conn *MConnection, desc ChannelDescriptor) *Channel { desc: desc, sendQueue: make(chan []byte, desc.SendQueueCapacity), recving: make([]byte, 0, desc.RecvBufferCapacity), - maxMsgPacketPayloadSize: conn.config.maxMsgPacketPayloadSize, + maxMsgPacketPayloadSize: conn.config.MaxMsgPacketPayloadSize, } } diff --git a/p2p/connection_test.go b/p2p/tmconn/connection_test.go similarity index 97% rename from p2p/connection_test.go rename to p2p/tmconn/connection_test.go index 2a64764e..65ae017c 100644 --- a/p2p/connection_test.go +++ b/p2p/tmconn/connection_test.go @@ -1,4 +1,4 @@ -package p2p +package tmconn import ( "net" @@ -31,7 +31,7 @@ func createMConnectionWithCallbacks(conn net.Conn, onReceive func(chID byte, msg func TestMConnectionSend(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -64,7 +64,7 @@ func TestMConnectionSend(t *testing.T) { func TestMConnectionReceive(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -102,7 +102,7 @@ func TestMConnectionReceive(t *testing.T) { func TestMConnectionStatus(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -119,7 +119,7 @@ func TestMConnectionStatus(t *testing.T) { func TestMConnectionStopsAndReturnsError(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() // nolint: errcheck defer client.Close() // nolint: errcheck @@ -152,7 +152,7 @@ func TestMConnectionStopsAndReturnsError(t *testing.T) { } func newClientAndServerConnsForReadErrors(require *require.Assertions, chOnErr chan struct{}) (*MConnection, *MConnection) { - server, client := netPipe() + server, client := NetPipe() onReceive := func(chID byte, msgBytes []byte) {} onError := func(r interface{}) {} @@ -283,7 +283,7 @@ func TestMConnectionReadErrorUnknownMsgType(t *testing.T) { func TestMConnectionTrySend(t *testing.T) { assert, require := assert.New(t), require.New(t) - server, client := netPipe() + server, client := NetPipe() defer server.Close() defer client.Close() diff --git a/p2p/secret_connection.go b/p2p/tmconn/secret_connection.go similarity index 99% rename from p2p/secret_connection.go rename to p2p/tmconn/secret_connection.go index f022d9c3..e1a3e050 100644 --- a/p2p/secret_connection.go +++ b/p2p/tmconn/secret_connection.go @@ -4,7 +4,7 @@ // is known ahead of time, and thus we are technically // still vulnerable to MITM. (TODO!) // See docs/sts-final.pdf for more info -package p2p +package tmconn import ( "bytes" diff --git a/p2p/secret_connection_test.go b/p2p/tmconn/secret_connection_test.go similarity index 99% rename from p2p/secret_connection_test.go rename to p2p/tmconn/secret_connection_test.go index 5e0611a8..5ef2c410 100644 --- a/p2p/secret_connection_test.go +++ b/p2p/tmconn/secret_connection_test.go @@ -1,4 +1,4 @@ -package p2p +package tmconn import ( "io" diff --git a/p2p/types/errors.go b/p2p/types/errors.go new file mode 100644 index 00000000..ead2a833 --- /dev/null +++ b/p2p/types/errors.go @@ -0,0 +1,20 @@ +package types + +import ( + "errors" + "fmt" +) + +var ( + ErrSwitchDuplicatePeer = errors.New("Duplicate peer") + ErrSwitchConnectToSelf = errors.New("Connect to self") +) + +type ErrSwitchAuthenticationFailure struct { + Dialed *NetAddress + Got ID +} + +func (e ErrSwitchAuthenticationFailure) Error() string { + return fmt.Sprintf("Failed to authenticate peer. Dialed %v, but got peer with ID %s", e.Dialed, e.Got) +} diff --git a/p2p/key.go b/p2p/types/key.go similarity index 99% rename from p2p/key.go rename to p2p/types/key.go index ea0f0b07..4ce5ee50 100644 --- a/p2p/key.go +++ b/p2p/types/key.go @@ -1,4 +1,4 @@ -package p2p +package types import ( "bytes" diff --git a/p2p/key_test.go b/p2p/types/key_test.go similarity index 98% rename from p2p/key_test.go rename to p2p/types/key_test.go index c2e1f3e0..f18fb3b9 100644 --- a/p2p/key_test.go +++ b/p2p/types/key_test.go @@ -1,4 +1,4 @@ -package p2p +package types import ( "bytes" diff --git a/p2p/netaddress.go b/p2p/types/netaddress.go similarity index 99% rename from p2p/netaddress.go rename to p2p/types/netaddress.go index 333d16e5..f0b397e8 100644 --- a/p2p/netaddress.go +++ b/p2p/types/netaddress.go @@ -2,7 +2,7 @@ // Originally Copyright (c) 2013-2014 Conformal Systems LLC. // https://github.com/conformal/btcd/blob/master/LICENSE -package p2p +package types import ( "encoding/hex" diff --git a/p2p/netaddress_test.go b/p2p/types/netaddress_test.go similarity index 99% rename from p2p/netaddress_test.go rename to p2p/types/netaddress_test.go index 6c1930a2..0119cc3b 100644 --- a/p2p/netaddress_test.go +++ b/p2p/types/netaddress_test.go @@ -1,4 +1,4 @@ -package p2p +package types import ( "net" diff --git a/p2p/types.go b/p2p/types/node_info.go similarity index 97% rename from p2p/types.go rename to p2p/types/node_info.go index d93adc9b..10c48685 100644 --- a/p2p/types.go +++ b/p2p/types/node_info.go @@ -1,4 +1,4 @@ -package p2p +package types import ( "fmt" @@ -11,6 +11,10 @@ import ( const maxNodeInfoSize = 10240 // 10Kb +func MaxNodeInfoSize() int { + return maxNodeInfoSize +} + // NodeInfo is the basic node information exchanged // between two peers during the Tendermint P2P handshake. type NodeInfo struct {