diff --git a/main.go b/main.go index 240b16a4..4f9b2e37 100644 --- a/main.go +++ b/main.go @@ -112,10 +112,12 @@ func (n *Node) switchEventsHandler() { switch swEvent.(type) { case p2p.SwitchEventNewPeer: event := swEvent.(p2p.SwitchEventNewPeer) - n.sendOurExternalAddrs(event.Peer) - if n.book.NeedMoreAddrs() { - pkt := p2p.NewPacket(p2p.PexCh, p2p.NewPexRequestMessage()) - event.Peer.TrySend(pkt) + if event.Peer.IsOutbound() { + n.sendOurExternalAddrs(event.Peer) + if n.book.NeedMoreAddrs() { + pkt := p2p.NewPacket(p2p.PexCh, p2p.NewPexRequestMessage()) + event.Peer.TrySend(pkt) + } } case p2p.SwitchEventDonePeer: // TODO diff --git a/p2p/addrbook.go b/p2p/addrbook.go index cdd09bd4..9122d939 100644 --- a/p2p/addrbook.go +++ b/p2p/addrbook.go @@ -83,12 +83,15 @@ const ( // days since the last success before we will consider evicting an address. minBadDays = 7 - // max addresses that we will send in response to a GetSelection - getSelectionMax = 2500 - - // % of total addresses known that we will share with a call to GetSelection + // % of total addresses known returned by GetSelection. getSelectionPercent = 23 + // min addresses that must be returned by GetSelection. Useful for bootstrapping. + minGetSelection = 32 + + // max addresses returned by GetSelection + maxGetSelection = 2500 + // current version of the on-disk format. serializationVersion = 1 ) @@ -264,10 +267,11 @@ func (a *AddrBook) GetSelection() []*NetAddress { i++ } - numAddresses := len(allAddr) * getSelectionPercent / 100 - if numAddresses > getSelectionMax { - numAddresses = getSelectionMax - } + + numAddresses := MaxInt( + MinInt(minGetSelection, len(allAddr)), + len(allAddr) * getSelectionPercent / 100) + numAddresses = MinInt(maxGetSelection, numAddresses) // Fisher-Yates shuffle the array. We only need to do the first // `numAddresses' since we are throwing the rest. diff --git a/p2p/connection.go b/p2p/connection.go index 327c3e37..9af897bd 100644 --- a/p2p/connection.go +++ b/p2p/connection.go @@ -111,7 +111,7 @@ func (c *Connection) Send(pkt Packet) bool { } func (c *Connection) String() string { - return fmt.Sprintf("Connection{%v}", c.conn.RemoteAddr()) + return fmt.Sprintf("/%v/", c.conn.RemoteAddr()) } func (c *Connection) flush() { @@ -140,7 +140,6 @@ func (c *Connection) _recover() { // sendHandler pulls from .sendQueue and writes to .bufWriter func (c *Connection) sendHandler() { - log.Debug("%v sendHandler", c) defer c._recover() FOR_LOOP: @@ -148,7 +147,6 @@ FOR_LOOP: var err error select { case sendPkt := <-c.sendQueue: - log.Debug("Found pkt from sendQueue. Writing pkt to underlying connection") _, err = packetTypeMessage.WriteTo(c.bufWriter) if err != nil { break @@ -159,11 +157,11 @@ FOR_LOOP: c.flush() case <-c.pingRepeatTimer.Ch: _, err = packetTypePing.WriteTo(c.bufWriter) - log.Debug("Send [Ping] -> %v", c) + // log.Debug("PING %v", c) c.flush() case <-c.pong: _, err = packetTypePong.WriteTo(c.bufWriter) - log.Debug("Send [Pong] -> %v", c) + // log.Debug("PONG %v", c) c.flush() case <-c.quit: break FOR_LOOP @@ -179,14 +177,12 @@ FOR_LOOP: } } - log.Debug("%v sendHandler done", c) // Cleanup } // recvHandler reads from .bufReader and pushes to the appropriate // channel's recvQueue. func (c *Connection) recvHandler() { - log.Debug("%v recvHandler", c) defer c._recover() FOR_LOOP: @@ -206,8 +202,6 @@ FOR_LOOP: c.Stop() } break FOR_LOOP - } else { - log.Debug("Found pktType %v", pktType) } switch pktType { @@ -217,7 +211,6 @@ FOR_LOOP: c.pong <- struct{}{} case packetTypePong: // do nothing - log.Debug("[%v] Received Pong", c) case packetTypeMessage: pkt, err := ReadPacketSafe(c.bufReader) if err != nil { @@ -239,7 +232,6 @@ FOR_LOOP: c.pingRepeatTimer.Reset() } - log.Debug("%v recvHandler done", c) // Cleanup close(c.pong) for _ = range c.pong { diff --git a/p2p/peer.go b/p2p/peer.go index 701c56d0..f3149ca4 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -75,7 +75,6 @@ func (p *Peer) Channel(chName string) *Channel { // TrySend returns true if the packet was successfully queued. // Returning true does not imply that the packet will be sent. func (p *Peer) TrySend(pkt Packet) bool { - log.Debug("TrySend [%v] -> %v", pkt, p) channel := p.Channel(string(pkt.Channel)) sendQueue := channel.sendQueue @@ -85,14 +84,15 @@ func (p *Peer) TrySend(pkt Packet) bool { select { case sendQueue <- pkt: + log.Debug("SEND %v: %v", p, pkt) return true default: // buffer full + log.Debug("FAIL SEND %v: %v", p, pkt) return false } } func (p *Peer) Send(pkt Packet) bool { - log.Debug("Send [%v] -> %v", pkt, p) channel := p.Channel(string(pkt.Channel)) sendQueue := channel.sendQueue @@ -101,6 +101,7 @@ func (p *Peer) Send(pkt Packet) bool { } sendQueue <- pkt + log.Debug("SEND %v: %v", p, pkt) return true } @@ -109,14 +110,18 @@ func (p *Peer) WriteTo(w io.Writer) (n int64, err error) { } func (p *Peer) String() string { - return fmt.Sprintf("Peer{%v-%v,o:%v}", p.LocalAddress(), p.RemoteAddress(), p.outbound) + if p.outbound { + return fmt.Sprintf("P(->%v)", p.conn) + } else { + return fmt.Sprintf("P(%v->)", p.conn) + } } // sendHandler pulls from a channel and pushes to the connection. // Each channel gets its own sendHandler goroutine; // Golang's channel implementation handles the scheduling. func (p *Peer) sendHandler(chName string) { - log.Debug("%v sendHandler [%v]", p, chName) + // log.Debug("%v sendHandler [%v]", p, chName) channel := p.channels[chName] sendQueue := channel.sendQueue FOR_LOOP: @@ -125,14 +130,13 @@ FOR_LOOP: case <-p.quit: break FOR_LOOP case pkt := <-sendQueue: - log.Debug("Sending packet to peer sendQueue") // blocks until the connection is Stop'd, // which happens when this peer is Stop'd. p.conn.Send(pkt) } } - log.Debug("%v sendHandler [%v] closed", p, chName) + // log.Debug("%v sendHandler [%v] closed", p, chName) // Cleanup } @@ -141,7 +145,7 @@ FOR_LOOP: // Many peers have goroutines that push to the same pktRecvQueue. // Golang's channel implementation handles the scheduling. func (p *Peer) recvHandler(chName string, pktRecvQueue chan<- *InboundPacket) { - log.Debug("%v recvHandler [%v]", p, chName) + // log.Debug("%v recvHandler [%v]", p, chName) channel := p.channels[chName] recvQueue := channel.recvQueue @@ -166,7 +170,7 @@ FOR_LOOP: } } - log.Debug("%v recvHandler [%v] closed", p, chName) + // log.Debug("%v recvHandler [%v] closed", p, chName) // Cleanup } @@ -222,7 +226,6 @@ type Packet struct { func NewPacket(chName String, msg Binary) Packet { msgBytes := BinaryBytes(msg) - log.Debug("NewPacket msg bytes: %X", msgBytes) return Packet{ Channel: chName, Bytes: msgBytes, @@ -253,7 +256,6 @@ func ReadPacketSafe(r io.Reader) (pkt Packet, err error) { if err != nil { return } - log.Debug("ReadPacket* msg bytes: %X", bytes) return Packet{Channel: chName, Bytes: bytes}, nil } diff --git a/p2p/peer_manager.go b/p2p/peer_manager.go index 9f71fadb..4b48141f 100644 --- a/p2p/peer_manager.go +++ b/p2p/peer_manager.go @@ -3,6 +3,7 @@ package p2p import ( "bytes" "errors" + "fmt" "io" "sync/atomic" "time" @@ -95,7 +96,6 @@ func (pm *PeerManager) ensurePeers() { for j := 0; i < 3; j++ { picked = pm.book.PickAddress(newBias) if picked == nil { - log.Debug("Empty addrbook.") return } if toDial.Has(picked.String()) || @@ -179,7 +179,7 @@ const ( // TODO: check for unnecessary extra bytes at the end. func decodeMessage(bz ByteSlice) (msg Message) { - log.Debug("decoding msg bytes: %X", bz) + // log.Debug("decoding msg bytes: %X", bz) switch Byte(bz[0]) { case pexTypeRequest: return &PexRequestMessage{} @@ -207,6 +207,10 @@ func (m *PexRequestMessage) WriteTo(w io.Writer) (n int64, err error) { return } +func (m *PexRequestMessage) String() string { + return "[PexRequest]" +} + /* A message with announced peer addresses. */ @@ -234,3 +238,7 @@ func (m *PexAddrsMessage) WriteTo(w io.Writer) (n int64, err error) { } return } + +func (m *PexAddrsMessage) String() string { + return fmt.Sprintf("[PexAddrs %v]", m.Addrs) +} diff --git a/p2p/switch.go b/p2p/switch.go index d7549ffa..01ab970c 100644 --- a/p2p/switch.go +++ b/p2p/switch.go @@ -83,7 +83,6 @@ func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, return nil, ErrSwitchStopped } - log.Info("Adding peer with connection: %v, outbound: %v", conn, outbound) // Create channels for peer channels := map[string]*Channel{} for _, chDesc := range s.channels { @@ -94,7 +93,7 @@ func (s *Switch) AddPeerWithConnection(conn *Connection, outbound bool) (*Peer, // Add the peer to .peers if s.peers.Add(peer) { - log.Debug("Adding: %v", peer) + log.Info("+ %v", peer) } else { log.Info("Ignoring duplicate: %v", peer) return nil, ErrSwitchDuplicatePeer @@ -178,7 +177,7 @@ func (s *Switch) Receive(chName string) *InboundPacket { case <-s.quit: return nil case inPacket := <-q: - log.Debug("Received packet on [%v]", chName) + log.Debug("RECV %v", inPacket) return inPacket } } @@ -204,7 +203,7 @@ func (s *Switch) Peers() IPeerSet { // Disconnect from a peer due to external error. // TODO: make record depending on reason. func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) { - log.Info("%v errored: %v", peer, reason) + log.Info("- %v !! reason: %v", peer, reason) s.peers.Remove(peer) peer.stop() @@ -215,6 +214,7 @@ func (s *Switch) StopPeerForError(peer *Peer, reason interface{}) { // Disconnect from a peer gracefully. // TODO: handle graceful disconnects. func (s *Switch) StopPeerGracefully(peer *Peer) { + log.Info("- %v", peer) s.peers.Remove(peer) peer.stop()