This commit is contained in:
StephenButtolph 2020-04-20 22:33:33 -04:00
parent 3c9187fc7b
commit 2c76dd1954
1 changed files with 62 additions and 28 deletions

View File

@ -114,16 +114,16 @@ type Handshake struct {
// Connections that I have added by IP, but haven't gotten an ID from
requestedLock sync.Mutex
requested map[string]struct{}
requestedTimeout timer.TimeoutManager
requestedTimeout timer.TimeoutManager // keys are hashes of the ip:port string
// Connections that I have added as a peer, but haven't gotten a version
// message from
pending Connections
versionTimeout timer.TimeoutManager
versionTimeout timer.TimeoutManager // keys are the peer IDs
// Connections that I have gotten a valid version message from
connections Connections
reconnectTimeout timer.TimeoutManager
reconnectTimeout timer.TimeoutManager // keys are the peer IDs
// IPs of nodes I'm connected to will be repeatedly gossiped throughout the network
peerListGossiper *timer.Repeater
@ -193,6 +193,8 @@ func (nm *Handshake) ConnectTo(peer salticidae.PeerID, stakerID ids.ShortID, add
return
}
nm.log.Info("Attempting to connect to %s", stakerID)
nm.net.AddPeer(peer)
nm.net.SetPeerAddr(peer, addr)
nm.net.ConnPeer(peer, 600, 1)
@ -205,7 +207,10 @@ func (nm *Handshake) ConnectTo(peer salticidae.PeerID, stakerID ids.ShortID, add
nm.reconnectTimeout.Put(peerID, func() {
nm.pending.Remove(peer, stakerID)
nm.connections.Remove(peer, stakerID)
nm.net.DelPeer(peer)
nm.numPeers.Set(float64(nm.connections.Len()))
})
}
@ -217,11 +222,11 @@ func (nm *Handshake) Connect(addr salticidae.NetAddr) {
return
}
nm.log.Debug("Adding peer %s", ip)
nm.log.Info("Adding peer %s", ip)
if !nm.enableStaking {
peer := salticidae.NewPeerIDFromNetAddr(addr, true)
nm.ConnectTo(peer, addr)
nm.ConnectTo(peer, toShortID(ip), addr)
return
}
@ -232,6 +237,14 @@ func (nm *Handshake) Connect(addr salticidae.NetAddr) {
nm.requestedLock.Lock()
defer nm.requestedLock.Unlock()
if *count == 600 {
nm.requested[ipStr] = struct{}{}
}
if _, exists := nm.requested[ipStr]; !exists {
return
}
if *count <= 0 {
delete(nm.requested, ipStr)
return
@ -242,12 +255,13 @@ func (nm *Handshake) Connect(addr salticidae.NetAddr) {
return
}
nm.log.Verbo("Attempting to discover peer at %s", ipStr)
nm.requested[ipStr] = struct{}{}
nm.log.Info("Attempting to discover peer at %s", ipStr)
msgNet := nm.net.AsMsgNetwork()
msgNet.Connect(addr)
ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr)))
nm.requestedTimeout.Put(ipID, *handler)
}
(*handler)()
}
@ -397,21 +411,27 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, _ unsafe.P
return connected
}
HandshakeNet.requestedLock.Lock()
defer HandshakeNet.requestedLock.Unlock()
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
addr := conn.GetAddr()
ip := toIPDesc(addr)
ipStr := ip.String()
if _, exists := HandshakeNet.requestedConnections[ipStr]; !exists {
ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr)))
HandshakeNet.requestedTimeout.Remove(ipID)
if _, exists := HandshakeNet.requested[ipStr]; !exists {
HandshakeNet.log.Debug("connHandler called with %s", ip)
return true
}
delete(HandshakeNet.requestedConnections, ipStr)
delete(HandshakeNet.requested, ipStr)
cert := conn.GetPeerCert()
peer := salticidae.NewPeerIDFromX509(cert, true)
HandshakeNet.ConnectTo(peer, addr)
HandshakeNet.ConnectTo(peer, getCert(cert), addr)
return true
}
@ -434,8 +454,6 @@ func (nm *Handshake) connectedToPeer(conn *C.struct_peernetwork_conn_t, peer sal
nm.reconnectTimeout.Remove(peerID)
nm.pending.Add(peer, cert, utils.IPDesc{})
handler := new(func())
*handler = func() {
if nm.pending.ContainsPeerID(peer) {
@ -461,23 +479,28 @@ func (nm *Handshake) disconnectedFromPeer(peer salticidae.PeerID) {
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
nm.versionTimeout.Remove(peerID)
nm.connections.Remove(peer, cert)
nm.numPeers.Set(float64(nm.connections.Len()))
if nm.vdrs.Contains(cert) {
nm.reconnectTimeout.Put(peerID, func() {
nm.pending.Remove(peer, cert)
nm.connections.Remove(peer, cert)
nm.net.DelPeer(peer)
nm.numPeers.Set(float64(nm.connections.Len()))
})
nm.pending.Add(peer, cert, utils.IPDesc{})
} else {
nm.pending.Remove(peer, cert)
nm.net.DelPeer(peer)
}
nm.versionTimeout.Remove(peerID)
if !nm.enableStaking {
nm.vdrs.Remove(cert)
}
nm.pending.RemovePeerID(peer)
nm.connections.RemovePeerID(peer)
nm.numPeers.Set(float64(nm.connections.Len()))
nm.awaitingLock.Lock()
defer nm.awaitingLock.Unlock()
for _, awaiting := range HandshakeNet.awaiting {
@ -513,19 +536,27 @@ func unknownPeerHandler(_addr *C.netaddr_t, _cert *C.x509_t, _ unsafe.Pointer) {
HandshakeNet.log.Info("Adding peer %s", ip)
var peer salticidae.PeerID
var id ids.ShortID
if HandshakeNet.enableStaking {
cert := salticidae.X509FromC(salticidae.CX509(_cert))
peer = salticidae.NewPeerIDFromX509(cert, true)
id = getCert(cert)
} else {
peer = salticidae.NewPeerIDFromNetAddr(addr, true)
id = toShortID(ip)
}
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
HandshakeNet.reconnectTimeout.Put(peerID, func() {
HandshakeNet.pending.Remove(peer, id)
HandshakeNet.connections.Remove(peer, id)
HandshakeNet.net.DelPeer(peer)
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
})
HandshakeNet.pending.Add(peer, id, utils.IPDesc{})
HandshakeNet.net.AddPeer(peer)
}
@ -568,12 +599,17 @@ func version(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.P
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
peer := conn.GetPeerID(true)
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
HandshakeNet.versionTimeout.Remove(peerID)
id, exists := HandshakeNet.pending.GetID(peer)
if !exists {
HandshakeNet.log.Warn("Dropping Version message because the peer isn't pending")
return
}
defer HandshakeNet.pending.Remove(peer, id)
HandshakeNet.pending.Remove(peer, id)
build := Builder{}
pMsg, err := build.Parse(Version, msg.GetPayloadByMove())
@ -612,18 +648,12 @@ func version(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.P
HandshakeNet.SendPeerList(peer)
HandshakeNet.connections.Add(peer, id, ip)
peerBytes := toID(peer)
peerID := ids.NewID(peerBytes)
HandshakeNet.versionTimeout.Remove(peerID)
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
if !HandshakeNet.enableStaking {
HandshakeNet.vdrs.Add(validators.NewValidator(id, 1))
}
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
HandshakeNet.awaitingLock.Lock()
defer HandshakeNet.awaitingLock.Unlock()
@ -706,3 +736,7 @@ func checkCompatibility(myVersion string, peerVersion string) bool {
// At the moment, we are all compatible.
return true
}
func toShortID(ip utils.IPDesc) ids.ShortID {
return ids.NewShortID(hashing.ComputeHash160Array([]byte(ip.String())))
}