mirror of https://github.com/poanetwork/gecko.git
Merge pull request #90 from StephenButtolph/retry-connections
Retry connections
This commit is contained in:
commit
d9c3c31c4f
|
@ -194,8 +194,11 @@ func (c *connections) remove(peer salticidae.PeerID, id ids.ShortID) {
|
||||||
func (c *connections) removePeerID(peer salticidae.PeerID) {
|
func (c *connections) removePeerID(peer salticidae.PeerID) {
|
||||||
peerID := toID(peer)
|
peerID := toID(peer)
|
||||||
if id, exists := c.peerIDToID[peerID]; exists {
|
if id, exists := c.peerIDToID[peerID]; exists {
|
||||||
|
idKey := id.Key()
|
||||||
|
|
||||||
delete(c.peerIDToID, peerID)
|
delete(c.peerIDToID, peerID)
|
||||||
delete(c.idToPeerID, id.Key())
|
delete(c.idToPeerID, idKey)
|
||||||
|
delete(c.idToIP, idKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -204,6 +207,7 @@ func (c *connections) removeID(id ids.ShortID) {
|
||||||
if peer, exists := c.idToPeerID[idKey]; exists {
|
if peer, exists := c.idToPeerID[idKey]; exists {
|
||||||
delete(c.peerIDToID, toID(peer))
|
delete(c.peerIDToID, toID(peer))
|
||||||
delete(c.idToPeerID, idKey)
|
delete(c.idToPeerID, idKey)
|
||||||
|
delete(c.idToIP, idKey)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -74,6 +74,10 @@ const (
|
||||||
// PeerListStakerGossipFraction calculates the fraction of stakers that are
|
// PeerListStakerGossipFraction calculates the fraction of stakers that are
|
||||||
// gossiped to. If set to 1, then only stakers will be gossiped to.
|
// gossiped to. If set to 1, then only stakers will be gossiped to.
|
||||||
PeerListStakerGossipFraction = 2
|
PeerListStakerGossipFraction = 2
|
||||||
|
|
||||||
|
// ConnectTimeout is the amount of time to wait before attempt to connect to
|
||||||
|
// an unknown peer
|
||||||
|
ConnectTimeout = 6 * time.Second
|
||||||
// GetVersionTimeout is the amount of time to wait before sending a
|
// GetVersionTimeout is the amount of time to wait before sending a
|
||||||
// getVersion message to a partially connected peer
|
// getVersion message to a partially connected peer
|
||||||
GetVersionTimeout = 2 * time.Second
|
GetVersionTimeout = 2 * time.Second
|
||||||
|
@ -96,27 +100,37 @@ var (
|
||||||
type Handshake struct {
|
type Handshake struct {
|
||||||
handshakeMetrics
|
handshakeMetrics
|
||||||
|
|
||||||
networkID uint32
|
networkID uint32 // ID of the network I'm running, used to prevent connecting to the wrong network
|
||||||
|
|
||||||
log logging.Logger
|
log logging.Logger
|
||||||
vdrs validators.Set
|
vdrs validators.Set // set of current validators in the AVAnet
|
||||||
myAddr salticidae.NetAddr
|
myAddr salticidae.NetAddr // IP I communicate to peers
|
||||||
myID ids.ShortID
|
myID ids.ShortID // ID that identifies myself as a staker or not
|
||||||
net salticidae.PeerNetwork
|
net salticidae.PeerNetwork // C messaging network
|
||||||
enableStaking bool // Should only be false for local tests
|
enableStaking bool // Should only be false for local tests
|
||||||
|
|
||||||
clock timer.Clock
|
clock timer.Clock
|
||||||
pending Connections // Connections that I haven't gotten version messages from
|
|
||||||
connections Connections // Connections that I think are connected
|
|
||||||
|
|
||||||
versionTimeout timer.TimeoutManager
|
// Connections that I have added by IP, but haven't gotten an ID from
|
||||||
reconnectTimeout timer.TimeoutManager
|
requestedLock sync.Mutex
|
||||||
|
requested map[string]struct{}
|
||||||
|
requestedTimeout timer.TimeoutManager // keys are hashes of the ip:port string
|
||||||
|
|
||||||
|
// Connections that I have added as a peer, but haven't gotten a version
|
||||||
|
// message from
|
||||||
|
pending Connections
|
||||||
|
versionTimeout timer.TimeoutManager // keys are the peer IDs
|
||||||
|
|
||||||
|
// Connections that I have gotten a valid version message from
|
||||||
|
connections Connections
|
||||||
|
reconnectTimeout timer.TimeoutManager // keys are the peer IDs
|
||||||
|
|
||||||
|
// IPs of nodes I'm connected to will be repeatedly gossiped throughout the network
|
||||||
peerListGossiper *timer.Repeater
|
peerListGossiper *timer.Repeater
|
||||||
|
|
||||||
|
// If any chain is blocked on connecting to peers, track these blockers here
|
||||||
awaitingLock sync.Mutex
|
awaitingLock sync.Mutex
|
||||||
awaiting []*networking.AwaitingConnections
|
awaiting []*networking.AwaitingConnections
|
||||||
|
|
||||||
requestedConnections map[string]struct{}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize to the c networking library. This should only be done once during
|
// Initialize to the c networking library. This should only be done once during
|
||||||
|
@ -132,19 +146,34 @@ func (nm *Handshake) Initialize(
|
||||||
networkID uint32,
|
networkID uint32,
|
||||||
) {
|
) {
|
||||||
log.AssertTrue(nm.net == nil, "Should only register network handlers once")
|
log.AssertTrue(nm.net == nil, "Should only register network handlers once")
|
||||||
|
|
||||||
|
nm.handshakeMetrics.Initialize(log, registerer)
|
||||||
|
|
||||||
|
nm.networkID = networkID
|
||||||
|
|
||||||
nm.log = log
|
nm.log = log
|
||||||
nm.vdrs = vdrs
|
nm.vdrs = vdrs
|
||||||
nm.myAddr = myAddr
|
nm.myAddr = myAddr
|
||||||
nm.myID = myID
|
nm.myID = myID
|
||||||
nm.net = peerNet
|
nm.net = peerNet
|
||||||
nm.enableStaking = enableStaking
|
nm.enableStaking = enableStaking
|
||||||
nm.networkID = networkID
|
|
||||||
|
nm.requested = make(map[string]struct{})
|
||||||
|
nm.requestedTimeout.Initialize(ConnectTimeout)
|
||||||
|
go nm.log.RecoverAndPanic(nm.requestedTimeout.Dispatch)
|
||||||
|
|
||||||
nm.pending = NewConnections()
|
nm.pending = NewConnections()
|
||||||
|
nm.versionTimeout.Initialize(GetVersionTimeout)
|
||||||
|
go nm.log.RecoverAndPanic(nm.versionTimeout.Dispatch)
|
||||||
|
|
||||||
nm.connections = NewConnections()
|
nm.connections = NewConnections()
|
||||||
|
nm.reconnectTimeout.Initialize(ReconnectTimeout)
|
||||||
|
go nm.log.RecoverAndPanic(nm.reconnectTimeout.Dispatch)
|
||||||
|
|
||||||
nm.requestedConnections = make(map[string]struct{})
|
nm.peerListGossiper = timer.NewRepeater(nm.gossipPeerList, PeerListGossipSpacing)
|
||||||
|
go nm.log.RecoverAndPanic(nm.peerListGossiper.Dispatch)
|
||||||
|
|
||||||
|
// register c message callbacks
|
||||||
net := peerNet.AsMsgNetwork()
|
net := peerNet.AsMsgNetwork()
|
||||||
|
|
||||||
net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), nil)
|
net.RegConnHandler(salticidae.MsgNetworkConnCallback(C.connHandler), nil)
|
||||||
|
@ -156,34 +185,95 @@ func (nm *Handshake) Initialize(
|
||||||
net.RegHandler(Version, salticidae.MsgNetworkMsgCallback(C.version), nil)
|
net.RegHandler(Version, salticidae.MsgNetworkMsgCallback(C.version), nil)
|
||||||
net.RegHandler(GetPeerList, salticidae.MsgNetworkMsgCallback(C.getPeerList), nil)
|
net.RegHandler(GetPeerList, salticidae.MsgNetworkMsgCallback(C.getPeerList), nil)
|
||||||
net.RegHandler(PeerList, salticidae.MsgNetworkMsgCallback(C.peerList), nil)
|
net.RegHandler(PeerList, salticidae.MsgNetworkMsgCallback(C.peerList), nil)
|
||||||
|
}
|
||||||
|
|
||||||
nm.handshakeMetrics.Initialize(nm.log, registerer)
|
// ConnectTo add the peer as a connection and connects to them.
|
||||||
|
func (nm *Handshake) ConnectTo(peer salticidae.PeerID, stakerID ids.ShortID, addr salticidae.NetAddr) {
|
||||||
|
if nm.pending.ContainsPeerID(peer) || nm.connections.ContainsPeerID(peer) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
nm.versionTimeout.Initialize(GetVersionTimeout)
|
nm.log.Info("Attempting to connect to %s", stakerID)
|
||||||
go nm.log.RecoverAndPanic(nm.versionTimeout.Dispatch)
|
|
||||||
|
|
||||||
nm.reconnectTimeout.Initialize(ReconnectTimeout)
|
nm.net.AddPeer(peer)
|
||||||
go nm.log.RecoverAndPanic(nm.reconnectTimeout.Dispatch)
|
nm.net.SetPeerAddr(peer, addr)
|
||||||
|
nm.net.ConnPeer(peer, 600, 1)
|
||||||
|
|
||||||
nm.peerListGossiper = timer.NewRepeater(nm.gossipPeerList, PeerListGossipSpacing)
|
ip := toIPDesc(addr)
|
||||||
go nm.log.RecoverAndPanic(nm.peerListGossiper.Dispatch)
|
nm.pending.Add(peer, stakerID, ip)
|
||||||
|
|
||||||
|
peerBytes := toID(peer)
|
||||||
|
peerID := ids.NewID(peerBytes)
|
||||||
|
|
||||||
|
nm.reconnectTimeout.Put(peerID, func() {
|
||||||
|
nm.pending.Remove(peer, stakerID)
|
||||||
|
nm.connections.Remove(peer, stakerID)
|
||||||
|
nm.net.DelPeer(peer)
|
||||||
|
|
||||||
|
nm.numPeers.Set(float64(nm.connections.Len()))
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect ...
|
// Connect ...
|
||||||
func (nm *Handshake) Connect(addr salticidae.NetAddr) {
|
func (nm *Handshake) Connect(addr salticidae.NetAddr) {
|
||||||
if !nm.enableStaking {
|
|
||||||
peer := salticidae.NewPeerIDFromNetAddr(addr, false)
|
|
||||||
nm.net.AddPeer(peer)
|
|
||||||
nm.net.SetPeerAddr(peer, addr)
|
|
||||||
nm.net.ConnPeer(peer, 600, 1)
|
|
||||||
peer.Free()
|
|
||||||
} else {
|
|
||||||
ip := toIPDesc(addr)
|
ip := toIPDesc(addr)
|
||||||
nm.requestedConnections[ip.String()] = struct{}{}
|
ipStr := ip.String()
|
||||||
|
if nm.pending.ContainsIP(ip) || nm.connections.ContainsIP(ip) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if !nm.enableStaking {
|
||||||
|
nm.log.Info("Adding peer %s", ip)
|
||||||
|
|
||||||
|
peer := salticidae.NewPeerIDFromNetAddr(addr, true)
|
||||||
|
nm.ConnectTo(peer, toShortID(ip), addr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nm.requestedLock.Lock()
|
||||||
|
_, exists := nm.requested[ipStr]
|
||||||
|
nm.requestedLock.Unlock()
|
||||||
|
|
||||||
|
if exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nm.log.Info("Adding peer %s", ip)
|
||||||
|
|
||||||
|
count := new(int)
|
||||||
|
*count = 100
|
||||||
|
handler := new(func())
|
||||||
|
*handler = func() {
|
||||||
|
nm.requestedLock.Lock()
|
||||||
|
defer nm.requestedLock.Unlock()
|
||||||
|
|
||||||
|
if *count == 100 {
|
||||||
|
nm.requested[ipStr] = struct{}{}
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, exists := nm.requested[ipStr]; !exists {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if *count <= 0 {
|
||||||
|
delete(nm.requested, ipStr)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
*count--
|
||||||
|
|
||||||
|
if nm.pending.ContainsIP(ip) || nm.connections.ContainsIP(ip) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
nm.log.Info("Attempting to discover peer at %s", ipStr)
|
||||||
|
|
||||||
msgNet := nm.net.AsMsgNetwork()
|
msgNet := nm.net.AsMsgNetwork()
|
||||||
msgNet.Connect(addr)
|
msgNet.Connect(addr)
|
||||||
|
|
||||||
|
ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr)))
|
||||||
|
nm.requestedTimeout.Put(ipID, *handler)
|
||||||
}
|
}
|
||||||
|
(*handler)()
|
||||||
}
|
}
|
||||||
|
|
||||||
// AwaitConnections ...
|
// AwaitConnections ...
|
||||||
|
@ -331,25 +421,27 @@ func connHandler(_conn *C.struct_msgnetwork_conn_t, connected C.bool, _ unsafe.P
|
||||||
return connected
|
return connected
|
||||||
}
|
}
|
||||||
|
|
||||||
|
HandshakeNet.requestedLock.Lock()
|
||||||
|
defer HandshakeNet.requestedLock.Unlock()
|
||||||
|
|
||||||
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
|
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
|
||||||
addr := conn.GetAddr()
|
addr := conn.GetAddr()
|
||||||
ip := toIPDesc(addr)
|
ip := toIPDesc(addr)
|
||||||
|
|
||||||
ipStr := ip.String()
|
ipStr := ip.String()
|
||||||
if _, exists := HandshakeNet.requestedConnections[ipStr]; !exists {
|
|
||||||
|
ipID := ids.NewID(hashing.ComputeHash256Array([]byte(ipStr)))
|
||||||
|
HandshakeNet.requestedTimeout.Remove(ipID)
|
||||||
|
|
||||||
|
if _, exists := HandshakeNet.requested[ipStr]; !exists {
|
||||||
HandshakeNet.log.Debug("connHandler called with %s", ip)
|
HandshakeNet.log.Debug("connHandler called with %s", ip)
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
delete(HandshakeNet.requestedConnections, ipStr)
|
delete(HandshakeNet.requested, ipStr)
|
||||||
|
|
||||||
cert := conn.GetPeerCert()
|
cert := conn.GetPeerCert()
|
||||||
peer := salticidae.NewPeerIDFromX509(cert, false)
|
peer := salticidae.NewPeerIDFromX509(cert, true)
|
||||||
|
|
||||||
HandshakeNet.net.AddPeer(peer)
|
HandshakeNet.ConnectTo(peer, getCert(cert), addr)
|
||||||
HandshakeNet.net.SetPeerAddr(peer, addr)
|
|
||||||
HandshakeNet.net.ConnPeer(peer, 600, 1)
|
|
||||||
|
|
||||||
peer.Free()
|
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -372,8 +464,6 @@ func (nm *Handshake) connectedToPeer(conn *C.struct_peernetwork_conn_t, peer sal
|
||||||
|
|
||||||
nm.reconnectTimeout.Remove(peerID)
|
nm.reconnectTimeout.Remove(peerID)
|
||||||
|
|
||||||
nm.pending.Add(peer, cert, utils.IPDesc{})
|
|
||||||
|
|
||||||
handler := new(func())
|
handler := new(func())
|
||||||
*handler = func() {
|
*handler = func() {
|
||||||
if nm.pending.ContainsPeerID(peer) {
|
if nm.pending.ContainsPeerID(peer) {
|
||||||
|
@ -388,34 +478,39 @@ func (nm *Handshake) disconnectedFromPeer(peer salticidae.PeerID) {
|
||||||
cert := ids.ShortID{}
|
cert := ids.ShortID{}
|
||||||
if pendingCert, exists := nm.pending.GetID(peer); exists {
|
if pendingCert, exists := nm.pending.GetID(peer); exists {
|
||||||
cert = pendingCert
|
cert = pendingCert
|
||||||
|
nm.log.Info("Disconnected from pending peer %s", cert)
|
||||||
} else if connectedCert, exists := nm.connections.GetID(peer); exists {
|
} else if connectedCert, exists := nm.connections.GetID(peer); exists {
|
||||||
cert = connectedCert
|
cert = connectedCert
|
||||||
|
nm.log.Info("Disconnected from peer %s", cert)
|
||||||
} else {
|
} else {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
nm.log.Info("Disconnected from %s", cert)
|
|
||||||
|
|
||||||
peerBytes := toID(peer)
|
peerBytes := toID(peer)
|
||||||
peerID := ids.NewID(peerBytes)
|
peerID := ids.NewID(peerBytes)
|
||||||
|
|
||||||
|
nm.versionTimeout.Remove(peerID)
|
||||||
|
nm.connections.Remove(peer, cert)
|
||||||
|
nm.numPeers.Set(float64(nm.connections.Len()))
|
||||||
|
|
||||||
if nm.vdrs.Contains(cert) {
|
if nm.vdrs.Contains(cert) {
|
||||||
nm.reconnectTimeout.Put(peerID, func() {
|
nm.reconnectTimeout.Put(peerID, func() {
|
||||||
|
nm.pending.Remove(peer, cert)
|
||||||
|
nm.connections.Remove(peer, cert)
|
||||||
nm.net.DelPeer(peer)
|
nm.net.DelPeer(peer)
|
||||||
|
|
||||||
|
nm.numPeers.Set(float64(nm.connections.Len()))
|
||||||
})
|
})
|
||||||
|
nm.pending.Add(peer, cert, utils.IPDesc{})
|
||||||
} else {
|
} else {
|
||||||
|
nm.pending.Remove(peer, cert)
|
||||||
nm.net.DelPeer(peer)
|
nm.net.DelPeer(peer)
|
||||||
}
|
}
|
||||||
nm.versionTimeout.Remove(peerID)
|
|
||||||
|
|
||||||
if !nm.enableStaking {
|
if !nm.enableStaking {
|
||||||
nm.vdrs.Remove(cert)
|
nm.vdrs.Remove(cert)
|
||||||
}
|
}
|
||||||
|
|
||||||
nm.pending.RemovePeerID(peer)
|
|
||||||
nm.connections.RemovePeerID(peer)
|
|
||||||
nm.numPeers.Set(float64(nm.connections.Len()))
|
|
||||||
|
|
||||||
nm.awaitingLock.Lock()
|
nm.awaitingLock.Lock()
|
||||||
defer nm.awaitingLock.Unlock()
|
defer nm.awaitingLock.Unlock()
|
||||||
for _, awaiting := range HandshakeNet.awaiting {
|
for _, awaiting := range HandshakeNet.awaiting {
|
||||||
|
@ -451,19 +546,27 @@ func unknownPeerHandler(_addr *C.netaddr_t, _cert *C.x509_t, _ unsafe.Pointer) {
|
||||||
HandshakeNet.log.Info("Adding peer %s", ip)
|
HandshakeNet.log.Info("Adding peer %s", ip)
|
||||||
|
|
||||||
var peer salticidae.PeerID
|
var peer salticidae.PeerID
|
||||||
|
var id ids.ShortID
|
||||||
if HandshakeNet.enableStaking {
|
if HandshakeNet.enableStaking {
|
||||||
cert := salticidae.X509FromC(salticidae.CX509(_cert))
|
cert := salticidae.X509FromC(salticidae.CX509(_cert))
|
||||||
peer = salticidae.NewPeerIDFromX509(cert, true)
|
peer = salticidae.NewPeerIDFromX509(cert, true)
|
||||||
|
id = getCert(cert)
|
||||||
} else {
|
} else {
|
||||||
peer = salticidae.NewPeerIDFromNetAddr(addr, true)
|
peer = salticidae.NewPeerIDFromNetAddr(addr, true)
|
||||||
|
id = toShortID(ip)
|
||||||
}
|
}
|
||||||
|
|
||||||
peerBytes := toID(peer)
|
peerBytes := toID(peer)
|
||||||
peerID := ids.NewID(peerBytes)
|
peerID := ids.NewID(peerBytes)
|
||||||
|
|
||||||
HandshakeNet.reconnectTimeout.Put(peerID, func() {
|
HandshakeNet.reconnectTimeout.Put(peerID, func() {
|
||||||
|
HandshakeNet.pending.Remove(peer, id)
|
||||||
|
HandshakeNet.connections.Remove(peer, id)
|
||||||
HandshakeNet.net.DelPeer(peer)
|
HandshakeNet.net.DelPeer(peer)
|
||||||
|
|
||||||
|
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
|
||||||
})
|
})
|
||||||
|
HandshakeNet.pending.Add(peer, id, utils.IPDesc{})
|
||||||
HandshakeNet.net.AddPeer(peer)
|
HandshakeNet.net.AddPeer(peer)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -506,12 +609,17 @@ func version(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.P
|
||||||
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
|
conn := salticidae.PeerNetworkConnFromC(salticidae.CPeerNetworkConn(_conn))
|
||||||
peer := conn.GetPeerID(true)
|
peer := conn.GetPeerID(true)
|
||||||
|
|
||||||
|
peerBytes := toID(peer)
|
||||||
|
peerID := ids.NewID(peerBytes)
|
||||||
|
|
||||||
|
HandshakeNet.versionTimeout.Remove(peerID)
|
||||||
|
|
||||||
id, exists := HandshakeNet.pending.GetID(peer)
|
id, exists := HandshakeNet.pending.GetID(peer)
|
||||||
if !exists {
|
if !exists {
|
||||||
|
HandshakeNet.log.Warn("Dropping Version message because the peer isn't pending")
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
HandshakeNet.pending.Remove(peer, id)
|
||||||
defer HandshakeNet.pending.Remove(peer, id)
|
|
||||||
|
|
||||||
build := Builder{}
|
build := Builder{}
|
||||||
pMsg, err := build.Parse(Version, msg.GetPayloadByMove())
|
pMsg, err := build.Parse(Version, msg.GetPayloadByMove())
|
||||||
|
@ -550,18 +658,12 @@ func version(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.P
|
||||||
|
|
||||||
HandshakeNet.SendPeerList(peer)
|
HandshakeNet.SendPeerList(peer)
|
||||||
HandshakeNet.connections.Add(peer, id, ip)
|
HandshakeNet.connections.Add(peer, id, ip)
|
||||||
|
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
|
||||||
peerBytes := toID(peer)
|
|
||||||
peerID := ids.NewID(peerBytes)
|
|
||||||
|
|
||||||
HandshakeNet.versionTimeout.Remove(peerID)
|
|
||||||
|
|
||||||
if !HandshakeNet.enableStaking {
|
if !HandshakeNet.enableStaking {
|
||||||
HandshakeNet.vdrs.Add(validators.NewValidator(id, 1))
|
HandshakeNet.vdrs.Add(validators.NewValidator(id, 1))
|
||||||
}
|
}
|
||||||
|
|
||||||
HandshakeNet.numPeers.Set(float64(HandshakeNet.connections.Len()))
|
|
||||||
|
|
||||||
HandshakeNet.awaitingLock.Lock()
|
HandshakeNet.awaitingLock.Lock()
|
||||||
defer HandshakeNet.awaitingLock.Unlock()
|
defer HandshakeNet.awaitingLock.Unlock()
|
||||||
|
|
||||||
|
@ -612,20 +714,14 @@ func peerList(_msg *C.struct_msg_t, _conn *C.struct_msgnetwork_conn_t, _ unsafe.
|
||||||
cErr := salticidae.NewError()
|
cErr := salticidae.NewError()
|
||||||
for _, ip := range ips {
|
for _, ip := range ips {
|
||||||
addr := salticidae.NewNetAddrFromIPPortString(ip.String(), true, &cErr)
|
addr := salticidae.NewNetAddrFromIPPortString(ip.String(), true, &cErr)
|
||||||
if cErr.GetCode() == 0 && !HandshakeNet.myAddr.IsEq(addr) { // Make sure not to connect to myself
|
|
||||||
ip := toIPDesc(addr)
|
if cErr.GetCode() != 0 || HandshakeNet.myAddr.IsEq(addr) {
|
||||||
if !HandshakeNet.pending.ContainsIP(ip) && !HandshakeNet.connections.ContainsIP(ip) {
|
// Make sure not to connect to myself
|
||||||
HandshakeNet.log.Debug("Adding peer %s", ip)
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
HandshakeNet.Connect(addr)
|
HandshakeNet.Connect(addr)
|
||||||
}
|
}
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func getMsgCert(_conn *C.struct_msgnetwork_conn_t) ids.ShortID {
|
|
||||||
conn := salticidae.MsgNetworkConnFromC(salticidae.CMsgNetworkConn(_conn))
|
|
||||||
return getCert(conn.GetPeerCert())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func getPeerCert(_conn *C.struct_peernetwork_conn_t) ids.ShortID {
|
func getPeerCert(_conn *C.struct_peernetwork_conn_t) ids.ShortID {
|
||||||
|
@ -635,13 +731,12 @@ func getPeerCert(_conn *C.struct_peernetwork_conn_t) ids.ShortID {
|
||||||
|
|
||||||
func getCert(cert salticidae.X509) ids.ShortID {
|
func getCert(cert salticidae.X509) ids.ShortID {
|
||||||
der := cert.GetDer(false)
|
der := cert.GetDer(false)
|
||||||
defer der.Free()
|
|
||||||
|
|
||||||
certDS := salticidae.NewDataStreamMovedFromByteArray(der, false)
|
certDS := salticidae.NewDataStreamMovedFromByteArray(der, false)
|
||||||
defer certDS.Free()
|
|
||||||
|
|
||||||
certBytes := certDS.GetDataInPlace(certDS.Size()).Get()
|
certBytes := certDS.GetDataInPlace(certDS.Size()).Get()
|
||||||
certID, err := ids.ToShortID(hashing.PubkeyBytesToAddress(certBytes))
|
certID, err := ids.ToShortID(hashing.PubkeyBytesToAddress(certBytes))
|
||||||
|
|
||||||
|
certDS.Free()
|
||||||
|
der.Free()
|
||||||
HandshakeNet.log.AssertNoError(err)
|
HandshakeNet.log.AssertNoError(err)
|
||||||
return certID
|
return certID
|
||||||
}
|
}
|
||||||
|
@ -652,12 +747,6 @@ func checkCompatibility(myVersion string, peerVersion string) bool {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
func toAddr(ip utils.IPDesc, autoFree bool) salticidae.NetAddr {
|
|
||||||
err := salticidae.NewError()
|
|
||||||
addr := salticidae.NewNetAddrFromIPPortString(ip.String(), autoFree, &err)
|
|
||||||
HandshakeNet.log.AssertTrue(err.GetCode() == 0, "IP Failed parsing")
|
|
||||||
return addr
|
|
||||||
}
|
|
||||||
func toShortID(ip utils.IPDesc) ids.ShortID {
|
func toShortID(ip utils.IPDesc) ids.ShortID {
|
||||||
return ids.NewShortID(hashing.ComputeHash160Array([]byte(ip.String())))
|
return ids.NewShortID(hashing.ComputeHash160Array([]byte(ip.String())))
|
||||||
}
|
}
|
||||||
|
|
12
node/node.go
12
node/node.go
|
@ -5,7 +5,7 @@ package node
|
||||||
|
|
||||||
// #include "salticidae/network.h"
|
// #include "salticidae/network.h"
|
||||||
// void onTerm(int sig, void *);
|
// void onTerm(int sig, void *);
|
||||||
// void errorHandler(SalticidaeCError *, bool, void *);
|
// void errorHandler(SalticidaeCError *, bool, int32_t, void *);
|
||||||
import "C"
|
import "C"
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
@ -130,14 +130,14 @@ func onTerm(C.int, unsafe.Pointer) {
|
||||||
}
|
}
|
||||||
|
|
||||||
//export errorHandler
|
//export errorHandler
|
||||||
func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, _ unsafe.Pointer) {
|
func errorHandler(_err *C.struct_SalticidaeCError, fatal C.bool, asyncID C.int32_t, _ unsafe.Pointer) {
|
||||||
err := (*salticidae.Error)(unsafe.Pointer(_err))
|
err := (*salticidae.Error)(unsafe.Pointer(_err))
|
||||||
if fatal {
|
if fatal {
|
||||||
MainNode.Log.Fatal("Error during async call: %s", salticidae.StrError(err.GetCode()))
|
MainNode.Log.Fatal("Error during async call: %s", salticidae.StrError(err.GetCode()))
|
||||||
MainNode.EC.Stop()
|
MainNode.EC.Stop()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
MainNode.Log.Error("Error during async call: %s", salticidae.StrError(err.GetCode()))
|
MainNode.Log.Debug("Error during async with ID %d call: %s", asyncID, salticidae.StrError(err.GetCode()))
|
||||||
}
|
}
|
||||||
|
|
||||||
func (n *Node) initNetlib() error {
|
func (n *Node) initNetlib() error {
|
||||||
|
@ -152,6 +152,8 @@ func (n *Node) initNetlib() error {
|
||||||
|
|
||||||
// Create peer network config, may have tls enabled
|
// Create peer network config, may have tls enabled
|
||||||
peerConfig := salticidae.NewPeerNetworkConfig()
|
peerConfig := salticidae.NewPeerNetworkConfig()
|
||||||
|
peerConfig.ConnTimeout(60)
|
||||||
|
|
||||||
msgConfig := peerConfig.AsMsgNetworkConfig()
|
msgConfig := peerConfig.AsMsgNetworkConfig()
|
||||||
msgConfig.MaxMsgSize(maxMessageSize)
|
msgConfig.MaxMsgSize(maxMessageSize)
|
||||||
|
|
||||||
|
@ -254,7 +256,7 @@ func (n *Node) StartConsensusServer() error {
|
||||||
// Listen for P2P messages
|
// Listen for P2P messages
|
||||||
n.PeerNet.Listen(serverIP, &err)
|
n.PeerNet.Listen(serverIP, &err)
|
||||||
if code := err.GetCode(); code != 0 {
|
if code := err.GetCode(); code != 0 {
|
||||||
return fmt.Errorf("failed to start consensus server: %s", salticidae.StrError(code))
|
return fmt.Errorf("failed to listen on consensus server at %s: %s", n.Config.StakingIP, salticidae.StrError(code))
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start a server to handle throughput tests if configuration says to. Disabled by default.
|
// Start a server to handle throughput tests if configuration says to. Disabled by default.
|
||||||
|
@ -268,7 +270,7 @@ func (n *Node) StartConsensusServer() error {
|
||||||
|
|
||||||
n.ClientNet.Listen(clientIP, &err)
|
n.ClientNet.Listen(clientIP, &err)
|
||||||
if code := err.GetCode(); code != 0 {
|
if code := err.GetCode(); code != 0 {
|
||||||
return fmt.Errorf("failed to listen on xput server: %s", salticidae.StrError(code))
|
return fmt.Errorf("failed to listen on xput server at 127.0.0.1:%d: %s", n.Config.ThroughputPort, salticidae.StrError(code))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
|
@ -28,6 +28,7 @@ borealis_bootstrap:
|
||||||
staking_tls_enabled: true
|
staking_tls_enabled: true
|
||||||
staking_tls_key_file: "/home/ubuntu/keys/staker.key"
|
staking_tls_key_file: "/home/ubuntu/keys/staker.key"
|
||||||
staking_tls_cert_file: "/home/ubuntu/keys/staker.crt"
|
staking_tls_cert_file: "/home/ubuntu/keys/staker.crt"
|
||||||
|
plugin_dir: "/home/ubuntu/go/src/github.com/ava-labs/gecko/build/plugins"
|
||||||
log_dir: "/home/ubuntu/.gecko"
|
log_dir: "/home/ubuntu/.gecko"
|
||||||
log_level: debug
|
log_level: debug
|
||||||
snow_sample_size: 3
|
snow_sample_size: 3
|
||||||
|
@ -73,6 +74,7 @@ borealis_node:
|
||||||
staking_tls_enabled: true
|
staking_tls_enabled: true
|
||||||
staking_tls_key_file: "/home/ubuntu/keys/staker.key"
|
staking_tls_key_file: "/home/ubuntu/keys/staker.key"
|
||||||
staking_tls_cert_file: "/home/ubuntu/keys/staker.crt"
|
staking_tls_cert_file: "/home/ubuntu/keys/staker.crt"
|
||||||
|
plugin_dir: "/home/ubuntu/go/src/github.com/ava-labs/gecko/build/plugins"
|
||||||
log_dir: "/home/ubuntu/.gecko"
|
log_dir: "/home/ubuntu/.gecko"
|
||||||
log_level: debug
|
log_level: debug
|
||||||
snow_sample_size: 3
|
snow_sample_size: 3
|
||||||
|
|
|
@ -0,0 +1,9 @@
|
||||||
|
|
||||||
|
---
|
||||||
|
- name: Update the network
|
||||||
|
connection: ssh
|
||||||
|
gather_facts: false
|
||||||
|
hosts: all
|
||||||
|
tasks:
|
||||||
|
- name: Kill Node
|
||||||
|
command: killall ava
|
|
@ -0,0 +1,11 @@
|
||||||
|
|
||||||
|
---
|
||||||
|
- name: Update the network
|
||||||
|
connection: ssh
|
||||||
|
gather_facts: false
|
||||||
|
hosts: all
|
||||||
|
tasks:
|
||||||
|
- name: Ping node
|
||||||
|
shell: "ls"
|
||||||
|
environment:
|
||||||
|
PATH: /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin
|
|
@ -7,8 +7,8 @@
|
||||||
vars:
|
vars:
|
||||||
ava_binary: ~/go/src/github.com/ava-labs/gecko/build/ava
|
ava_binary: ~/go/src/github.com/ava-labs/gecko/build/ava
|
||||||
repo_folder: ~/go/src/github.com/ava-labs/gecko
|
repo_folder: ~/go/src/github.com/ava-labs/gecko
|
||||||
repo_name: ava-labs/gecko
|
repo_name: ava-labs/gecko-internal
|
||||||
repo_branch: cascade
|
repo_branch: retry-connections
|
||||||
tasks:
|
tasks:
|
||||||
- name: Kill Node
|
- name: Kill Node
|
||||||
command: killall ava
|
command: killall ava
|
||||||
|
@ -33,6 +33,6 @@
|
||||||
path: "{{ log_dir }}"
|
path: "{{ log_dir }}"
|
||||||
state: absent
|
state: absent
|
||||||
- name: Start node
|
- name: Start node
|
||||||
shell: "nohup {{ ava_binary }} --network-id={{ network_id }} --api-admin-enabled={{ api_admin_enabled }} --api-keystore-enabled={{ api_keystore_enabled }} --api-metrics-enabled={{ api_metrics_enabled }} --ava-tx-fee={{ ava_tx_fee }} --assertions-enabled={{ assertions_enabled }} --signature-verification-enabled={{ signature_verification_enabled }} --db-enabled={{ db_enabled }} --db-dir={{ db_dir }} --http-port={{ http_port }} --http-tls-enabled={{ http_tls_enabled }} --http-tls-key-file={{ http_tls_key_file }} --http-tls-cert-file={{ http_tls_cert_file }} --bootstrap-ips={{ bootstrap_ips }} --bootstrap-ids={{ bootstrap_ids }} --public-ip={{ ansible_host }} --staking-port={{ staking_port }} --staking-tls-enabled={{ staking_tls_enabled }} --staking-tls-key-file={{ staking_tls_key_file }} --staking-tls-cert-file={{ staking_tls_cert_file }} --log-dir={{ log_dir }} --log-level={{ log_level }} --snow-sample-size={{ snow_sample_size }} --snow-quorum-size={{ snow_quorum_size }} --snow-virtuous-commit-threshold={{ snow_virtuous_commit_threshold }} --snow-rogue-commit-threshold={{ snow_rogue_commit_threshold }} --snow-avalanche-num-parents={{ snow_avalanche_num_parents }} --snow-avalanche-batch-size={{ snow_avalanche_batch_size }} --api-ipcs-enabled={{ api_ipcs_enabled }} --xput-server-enabled={{ xput_server_enabled }} --xput-server-port={{ xput_server_port }} >/dev/null 2>&1 &"
|
shell: "nohup {{ ava_binary }} --network-id={{ network_id }} --api-admin-enabled={{ api_admin_enabled }} --api-keystore-enabled={{ api_keystore_enabled }} --api-metrics-enabled={{ api_metrics_enabled }} --ava-tx-fee={{ ava_tx_fee }} --assertions-enabled={{ assertions_enabled }} --signature-verification-enabled={{ signature_verification_enabled }} --db-enabled={{ db_enabled }} --db-dir={{ db_dir }} --http-port={{ http_port }} --http-tls-enabled={{ http_tls_enabled }} --http-tls-key-file={{ http_tls_key_file }} --http-tls-cert-file={{ http_tls_cert_file }} --bootstrap-ips={{ bootstrap_ips }} --bootstrap-ids={{ bootstrap_ids }} --public-ip={{ ansible_host }} --staking-port={{ staking_port }} --staking-tls-enabled={{ staking_tls_enabled }} --staking-tls-key-file={{ staking_tls_key_file }} --staking-tls-cert-file={{ staking_tls_cert_file }} --plugin-dir={{ plugin_dir }} --log-dir={{ log_dir }} --log-level={{ log_level }} --snow-sample-size={{ snow_sample_size }} --snow-quorum-size={{ snow_quorum_size }} --snow-virtuous-commit-threshold={{ snow_virtuous_commit_threshold }} --snow-rogue-commit-threshold={{ snow_rogue_commit_threshold }} --snow-avalanche-num-parents={{ snow_avalanche_num_parents }} --snow-avalanche-batch-size={{ snow_avalanche_batch_size }} --api-ipcs-enabled={{ api_ipcs_enabled }} --xput-server-enabled={{ xput_server_enabled }} --xput-server-port={{ xput_server_port }} >/dev/null 2>&1 &"
|
||||||
environment:
|
environment:
|
||||||
PATH: /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin
|
PATH: /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin
|
||||||
|
|
|
@ -7,8 +7,8 @@
|
||||||
vars:
|
vars:
|
||||||
ava_binary: ~/go/src/github.com/ava-labs/gecko/build/ava
|
ava_binary: ~/go/src/github.com/ava-labs/gecko/build/ava
|
||||||
repo_folder: ~/go/src/github.com/ava-labs/gecko
|
repo_folder: ~/go/src/github.com/ava-labs/gecko
|
||||||
repo_name: ava-labs/gecko
|
repo_name: ava-labs/gecko-internal
|
||||||
repo_branch: cascade
|
repo_branch: retry-connections
|
||||||
tasks:
|
tasks:
|
||||||
- name: Kill Node
|
- name: Kill Node
|
||||||
command: killall ava
|
command: killall ava
|
||||||
|
@ -25,6 +25,6 @@
|
||||||
environment:
|
environment:
|
||||||
PATH: /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin
|
PATH: /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin
|
||||||
- name: Start node
|
- name: Start node
|
||||||
shell: "nohup {{ ava_binary }} --network-id={{ network_id }} --api-admin-enabled={{ api_admin_enabled }} --api-keystore-enabled={{ api_keystore_enabled }} --api-metrics-enabled={{ api_metrics_enabled }} --ava-tx-fee={{ ava_tx_fee }} --assertions-enabled={{ assertions_enabled }} --signature-verification-enabled={{ signature_verification_enabled }} --db-enabled={{ db_enabled }} --db-dir={{ db_dir }} --http-port={{ http_port }} --http-tls-enabled={{ http_tls_enabled }} --http-tls-key-file={{ http_tls_key_file }} --http-tls-cert-file={{ http_tls_cert_file }} --bootstrap-ips={{ bootstrap_ips }} --bootstrap-ids={{ bootstrap_ids }} --public-ip={{ ansible_host }} --staking-port={{ staking_port }} --staking-tls-enabled={{ staking_tls_enabled }} --staking-tls-key-file={{ staking_tls_key_file }} --staking-tls-cert-file={{ staking_tls_cert_file }} --log-dir={{ log_dir }} --log-level={{ log_level }} --snow-sample-size={{ snow_sample_size }} --snow-quorum-size={{ snow_quorum_size }} --snow-virtuous-commit-threshold={{ snow_virtuous_commit_threshold }} --snow-rogue-commit-threshold={{ snow_rogue_commit_threshold }} --snow-avalanche-num-parents={{ snow_avalanche_num_parents }} --snow-avalanche-batch-size={{ snow_avalanche_batch_size }} --api-ipcs-enabled={{ api_ipcs_enabled }} --xput-server-enabled={{ xput_server_enabled }} --xput-server-port={{ xput_server_port }} >/dev/null 2>&1 &"
|
shell: "nohup {{ ava_binary }} --network-id={{ network_id }} --api-admin-enabled={{ api_admin_enabled }} --api-keystore-enabled={{ api_keystore_enabled }} --api-metrics-enabled={{ api_metrics_enabled }} --ava-tx-fee={{ ava_tx_fee }} --assertions-enabled={{ assertions_enabled }} --signature-verification-enabled={{ signature_verification_enabled }} --db-enabled={{ db_enabled }} --db-dir={{ db_dir }} --http-port={{ http_port }} --http-tls-enabled={{ http_tls_enabled }} --http-tls-key-file={{ http_tls_key_file }} --http-tls-cert-file={{ http_tls_cert_file }} --bootstrap-ips={{ bootstrap_ips }} --bootstrap-ids={{ bootstrap_ids }} --public-ip={{ ansible_host }} --staking-port={{ staking_port }} --staking-tls-enabled={{ staking_tls_enabled }} --staking-tls-key-file={{ staking_tls_key_file }} --staking-tls-cert-file={{ staking_tls_cert_file }} --plugin-dir={{ plugin_dir }} --log-dir={{ log_dir }} --log-level={{ log_level }} --snow-sample-size={{ snow_sample_size }} --snow-quorum-size={{ snow_quorum_size }} --snow-virtuous-commit-threshold={{ snow_virtuous_commit_threshold }} --snow-rogue-commit-threshold={{ snow_rogue_commit_threshold }} --snow-avalanche-num-parents={{ snow_avalanche_num_parents }} --snow-avalanche-batch-size={{ snow_avalanche_batch_size }} --api-ipcs-enabled={{ api_ipcs_enabled }} --xput-server-enabled={{ xput_server_enabled }} --xput-server-port={{ xput_server_port }} >/dev/null 2>&1 &"
|
||||||
environment:
|
environment:
|
||||||
PATH: /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin
|
PATH: /sbin:/usr/sbin:/bin:/usr/bin:/usr/local/bin:/snap/bin
|
||||||
|
|
|
@ -207,6 +207,7 @@ func (b *bootstrapper) finish() {
|
||||||
func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge) {
|
func (b *bootstrapper) executeAll(jobs *queue.Jobs, numBlocked prometheus.Gauge) {
|
||||||
for job, err := jobs.Pop(); err == nil; job, err = jobs.Pop() {
|
for job, err := jobs.Pop(); err == nil; job, err = jobs.Pop() {
|
||||||
numBlocked.Dec()
|
numBlocked.Dec()
|
||||||
|
b.BootstrapConfig.Context.Log.Debug("Executing: %s", job.ID())
|
||||||
if err := jobs.Execute(job); err != nil {
|
if err := jobs.Execute(job); err != nil {
|
||||||
b.BootstrapConfig.Context.Log.Warn("Error executing: %s", err)
|
b.BootstrapConfig.Context.Log.Warn("Error executing: %s", err)
|
||||||
}
|
}
|
||||||
|
|
|
@ -79,12 +79,23 @@ func (v *voter) bubbleVotes(votes ids.UniqueBag) ids.UniqueBag {
|
||||||
vtx := vts[0]
|
vtx := vts[0]
|
||||||
vts = vts[1:]
|
vts = vts[1:]
|
||||||
|
|
||||||
if status := vtx.Status(); status.Fetched() && !v.t.Consensus.VertexIssued(vtx) {
|
status := vtx.Status()
|
||||||
vts = append(vts, vtx.Parents()...)
|
if !status.Fetched() {
|
||||||
} else if !status.Decided() && v.t.Consensus.VertexIssued(vtx) {
|
v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is unknown", set.Len(), vtx.ID())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if status.Decided() {
|
||||||
|
v.t.Config.Context.Log.Verbo("Dropping %d vote(s) for %s because the vertex is decided", set.Len(), vtx.ID())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if v.t.Consensus.VertexIssued(vtx) {
|
||||||
|
v.t.Config.Context.Log.Verbo("Applying %d vote(s) for %s", set.Len(), vtx.ID())
|
||||||
bubbledVotes.UnionSet(vtx.ID(), set)
|
bubbledVotes.UnionSet(vtx.ID(), set)
|
||||||
} else {
|
} else {
|
||||||
v.t.Config.Context.Log.Debug("Dropping %d vote(s) for %s because the vertex is invalid", set.Len(), vtx.ID())
|
v.t.Config.Context.Log.Verbo("Bubbling %d vote(s) for %s because the vertex isn't issued", set.Len(), vtx.ID())
|
||||||
|
vts = append(vts, vtx.Parents()...)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue