From 4b4c431d67fffd188443e08ef70e5ee98f55a42d Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Wed, 13 Sep 2017 14:38:06 +0200 Subject: [PATCH] server: add NotifyWhenOnline method. This commit adds a listener queue for each peer, that can be used to queue listeners that will be notified when the targetted peer eventually comes online. --- server.go | 44 ++++++++++++++++++++++++++++++++++++++++---- 1 file changed, 40 insertions(+), 4 deletions(-) diff --git a/server.go b/server.go index e1e60035..2fd67771 100644 --- a/server.go +++ b/server.go @@ -57,6 +57,8 @@ type server struct { inboundPeers map[string]*peer outboundPeers map[string]*peer + peerConnectedListeners map[string][]chan<- struct{} + persistentPeers map[string]struct{} persistentConnReqs map[string][]*connmgr.ConnReq @@ -134,10 +136,11 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, persistentPeers: make(map[string]struct{}), persistentConnReqs: make(map[string][]*connmgr.ConnReq), - peersByID: make(map[int32]*peer), - peersByPub: make(map[string]*peer), - inboundPeers: make(map[string]*peer), - outboundPeers: make(map[string]*peer), + peersByID: make(map[int32]*peer), + peersByPub: make(map[string]*peer), + inboundPeers: make(map[string]*peer), + outboundPeers: make(map[string]*peer), + peerConnectedListeners: make(map[string][]chan<- struct{}), globalFeatures: globalFeatures, localFeatures: localFeatures, @@ -860,6 +863,33 @@ func (s *server) SendToPeer(target *btcec.PublicKey, return s.sendToPeer(target, msgs) } +// NotifyWhenOnline can be called by other subsystems to get notified when a +// particular peer comes online. +// +// NOTE: This function is safe for concurrent access. +func (s *server) NotifyWhenOnline(peer *btcec.PublicKey, + connectedChan chan<- struct{}) { + s.mu.Lock() + defer s.mu.Unlock() + + // Compute the target peer's identifier. + pubStr := string(peer.SerializeCompressed()) + + // Check if peer is connected. + _, ok := s.peersByPub[pubStr] + if ok { + // Connected, can return early. + srvrLog.Debugf("Notifying that peer %v is online", pubStr) + close(connectedChan) + return + } + + // Not connected, store this listener such that it can be notified when + // the peer comes online. + s.peerConnectedListeners[pubStr] = append( + s.peerConnectedListeners[pubStr], connectedChan) +} + // sendToPeer is an internal method that delivers messages to the specified // `target` peer. func (s *server) sendToPeer(target *btcec.PublicKey, @@ -1272,6 +1302,12 @@ func (s *server) addPeer(p *peer) { // channel router so we can synchronize our view of the channel graph // with this new peer. go s.authGossiper.SynchronizeNode(p.addr.IdentityKey) + + // Check if there are listeners waiting for this peer to come online. + for _, con := range s.peerConnectedListeners[pubStr] { + close(con) + } + delete(s.peerConnectedListeners, pubStr) } // removePeer removes the passed peer from the server's state of all active