From 2c76dd19544670504b56e05b06ea0288ce6bf56d Mon Sep 17 00:00:00 2001 From: StephenButtolph Date: Mon, 20 Apr 2020 22:33:33 -0400 Subject: [PATCH] wip --- networking/handshake_handlers.go | 90 ++++++++++++++++++++++---------- 1 file changed, 62 insertions(+), 28 deletions(-) diff --git a/networking/handshake_handlers.go b/networking/handshake_handlers.go index 4ba44cc..daa0aff 100644 --- a/networking/handshake_handlers.go +++ b/networking/handshake_handlers.go @@ -114,16 +114,16 @@ type Handshake struct { // Connections that I have added by IP, but haven't gotten an ID from requestedLock sync.Mutex requested map[string]struct{} - requestedTimeout timer.TimeoutManager + requestedTimeout timer.TimeoutManager // keys are hashes of the ip:port string // Connections that I have added as a peer, but haven't gotten a version // message from pending Connections - versionTimeout timer.TimeoutManager + versionTimeout timer.TimeoutManager // keys are the peer IDs // Connections that I have gotten a valid version message from connections Connections - reconnectTimeout timer.TimeoutManager + reconnectTimeout timer.TimeoutManager // keys are the peer IDs // IPs of nodes I'm connected to will be repeatedly gossiped throughout the network peerListGossiper *timer.Repeater @@ -193,6 +193,8 @@ func (nm *Handshake) ConnectTo(peer salticidae.PeerID, stakerID ids.ShortID, add return } + nm.log.Info("Attempting to connect to %s", stakerID) + nm.net.AddPeer(peer) nm.net.SetPeerAddr(peer, addr) nm.net.ConnPeer(peer, 600, 1) @@ -205,7 +207,10 @@ func (nm *Handshake) ConnectTo(peer salticidae.PeerID, stakerID ids.ShortID, add nm.reconnectTimeout.Put(peerID, func() { nm.pending.Remove(peer, stakerID) + nm.connections.Remove(peer, stakerID) nm.net.DelPeer(peer) + + nm.numPeers.Set(float64(nm.connections.Len())) }) } @@ -217,11 +222,11 @@ func (nm *Handshake) Connect(addr salticidae.NetAddr) { return } - nm.log.Debug("Adding peer %s", ip) + nm.log.Info("Adding peer %s", ip) if !nm.enableStaking { peer := salticidae.NewPeerIDFromNetAddr(addr, true) - nm.ConnectTo(peer, addr) + nm.ConnectTo(peer, toShortID(ip), addr) return } @@ -232,6 +237,14 @@ func (nm *Handshake) Connect(addr salticidae.NetAddr) { nm.requestedLock.Lock() defer nm.requestedLock.Unlock() + if *count == 600 { + nm.requested[ipStr] = struct{}{} + } + + if _, exists := nm.requested[ipStr]; !exists { + return + } + if *count <= 0 { delete(nm.requested, ipStr) return @@ -242,12 +255,13 @@ func (nm *Handshake) Connect(addr salticidae.NetAddr) { return } - nm.log.Verbo("Attempting to discover peer at %s", ipStr) - - nm.requested[ipStr] = struct{}{} + nm.log.Info("Attempting to discover peer at %s", ipStr) msgNet := nm.net.AsMsgNetwork() msgNet.Connect(addr) + + ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr))) + nm.requestedTimeout.Put(ipID, *handler) } (*handler)() } @@ -397,21 +411,27 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, _ unsafe.P return connected } + HandshakeNet.requestedLock.Lock() + defer HandshakeNet.requestedLock.Unlock() + conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn)) addr := conn.GetAddr() ip := toIPDesc(addr) - ipStr := ip.String() - if _, exists := HandshakeNet.requestedConnections[ipStr]; !exists { + + ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr))) + HandshakeNet.requestedTimeout.Remove(ipID) + + if _, exists := HandshakeNet.requested[ipStr]; !exists { HandshakeNet.log.Debug("connHandler called with %s", ip) return true } - delete(HandshakeNet.requestedConnections, ipStr) + delete(HandshakeNet.requested, ipStr) cert := conn.GetPeerCert() peer := salticidae.NewPeerIDFromX509(cert, true) - HandshakeNet.ConnectTo(peer, addr) + HandshakeNet.ConnectTo(peer, getCert(cert), addr) return true } @@ -434,8 +454,6 @@ func (nm *Handshake) connectedToPeer(conn *C.struct_peernetwork_conn_t, peer sal nm.reconnectTimeout.Remove(peerID) - nm.pending.Add(peer, cert, utils.IPDesc{}) - handler := new(func()) *handler = func() { if nm.pending.ContainsPeerID(peer) { @@ -461,23 +479,28 @@ func (nm *Handshake) disconnectedFromPeer(peer salticidae.PeerID) { peerBytes := toID(peer) peerID := ids.NewID(peerBytes) + nm.versionTimeout.Remove(peerID) + nm.connections.Remove(peer, cert) + nm.numPeers.Set(float64(nm.connections.Len())) + if nm.vdrs.Contains(cert) { nm.reconnectTimeout.Put(peerID, func() { + nm.pending.Remove(peer, cert) + nm.connections.Remove(peer, cert) nm.net.DelPeer(peer) + + nm.numPeers.Set(float64(nm.connections.Len())) }) + nm.pending.Add(peer, cert, utils.IPDesc{}) } else { + nm.pending.Remove(peer, cert) nm.net.DelPeer(peer) } - nm.versionTimeout.Remove(peerID) if !nm.enableStaking { nm.vdrs.Remove(cert) } - nm.pending.RemovePeerID(peer) - nm.connections.RemovePeerID(peer) - nm.numPeers.Set(float64(nm.connections.Len())) - nm.awaitingLock.Lock() defer nm.awaitingLock.Unlock() for _, awaiting := range HandshakeNet.awaiting { @@ -513,19 +536,27 @@ func unknownPeerHandler(_addr *C.netaddr_t, _cert *C.x509_t, _ unsafe.Pointer) { HandshakeNet.log.Info("Adding peer %s", ip) var peer salticidae.PeerID + var id ids.ShortID if HandshakeNet.enableStaking { cert := salticidae.X509FromC(salticidae.CX509(_cert)) peer = salticidae.NewPeerIDFromX509(cert, true) + id = getCert(cert) } else { peer = salticidae.NewPeerIDFromNetAddr(addr, true) + id = toShortID(ip) } peerBytes := toID(peer) peerID := ids.NewID(peerBytes) HandshakeNet.reconnectTimeout.Put(peerID, func() { + HandshakeNet.pending.Remove(peer, id) + HandshakeNet.connections.Remove(peer, id) HandshakeNet.net.DelPeer(peer) + + HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len())) }) + HandshakeNet.pending.Add(peer, id, utils.IPDesc{}) HandshakeNet.net.AddPeer(peer) } @@ -568,12 +599,17 @@ func version(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.P conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn)) peer := conn.GetPeerID(true) + peerBytes := toID(peer) + peerID := ids.NewID(peerBytes) + + HandshakeNet.versionTimeout.Remove(peerID) + id, exists := HandshakeNet.pending.GetID(peer) if !exists { + HandshakeNet.log.Warn("Dropping Version message because the peer isn't pending") return } - - defer HandshakeNet.pending.Remove(peer, id) + HandshakeNet.pending.Remove(peer, id) build := Builder{} pMsg, err := build.Parse(Version, msg.GetPayloadByMove()) @@ -612,18 +648,12 @@ func version(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.P HandshakeNet.SendPeerList(peer) HandshakeNet.connections.Add(peer, id, ip) - - peerBytes := toID(peer) - peerID := ids.NewID(peerBytes) - - HandshakeNet.versionTimeout.Remove(peerID) + HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len())) if !HandshakeNet.enableStaking { HandshakeNet.vdrs.Add(validators.NewValidator(id, 1)) } - HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len())) - HandshakeNet.awaitingLock.Lock() defer HandshakeNet.awaitingLock.Unlock() @@ -706,3 +736,7 @@ func checkCompatibility(myVersion string, peerVersion string) bool { // At the moment, we are all compatible. return true } + +func toShortID(ip utils.IPDesc) ids.ShortID { + return ids.NewShortID(hashing.ComputeHash160Array([]byte(ip.String()))) +}