diff --git a/server.go b/server.go index 702be32a..00c9c730 100644 --- a/server.go +++ b/server.go @@ -48,13 +48,15 @@ type server struct { // long-term identity private key. lightningID [32]byte - peersMtx sync.RWMutex + mu sync.Mutex peersByID map[int32]*peer peersByPub map[string]*peer - persistentPeers map[string]struct{} - inboundPeers map[string]*peer - outboundPeers map[string]*peer + inboundPeers map[string]*peer + outboundPeers map[string]*peer + + persistentPeers map[string]struct{} + persistentConnReqs map[string][]*connmgr.ConnReq cc *chainControl @@ -76,16 +78,6 @@ type server struct { connMgr *connmgr.ConnManager - pendingConnMtx sync.RWMutex - persistentConnReqs map[string][]*connmgr.ConnReq - - broadcastRequests chan *broadcastReq - sendRequests chan *sendReq - - newPeers chan *peer - donePeers chan *peer - queries chan interface{} - // globalFeatures feature vector which affects HTLCs and thus are also // advertised to other nodes. globalFeatures *lnwire.FeatureVector @@ -97,11 +89,9 @@ type server struct { // currentNodeAnn is the node announcement that has been broadcast to // the network upon startup, if the attributes of the node (us) has // changed since last start. - annMtx sync.Mutex currentNodeAnn *lnwire.NodeAnnouncement - wg sync.WaitGroup - quit chan struct{} + wg sync.WaitGroup } // newServer creates a new instance of the server which is to listen using the @@ -145,17 +135,8 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, inboundPeers: make(map[string]*peer), outboundPeers: make(map[string]*peer), - newPeers: make(chan *peer, 10), - donePeers: make(chan *peer, 10), - - broadcastRequests: make(chan *broadcastReq), - sendRequests: make(chan *sendReq), - globalFeatures: globalFeatures, localFeatures: localFeatures, - - queries: make(chan interface{}), - quit: make(chan struct{}), } // If the debug HTLC flag is on, then we invoice a "master debug" @@ -172,17 +153,24 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, LocalChannelClose: func(pubKey []byte, request *htlcswitch.ChanClose) { - s.peersMtx.RLock() - peer, ok := s.peersByPub[string(pubKey)] - s.peersMtx.RUnlock() - - if !ok { - srvrLog.Error("unable to close channel, peer"+ - " with %v id can't be found", pubKey) + peer, err := s.FindPeerByPubStr(string(pubKey)) + if err != nil { + srvrLog.Errorf("unable to close channel, peer"+ + " with %v id can't be found: %v", + pubKey, err, + ) return } - peer.localCloseChanReqs <- request + select { + case peer.localCloseChanReqs <- request: + srvrLog.Infof("local close channel request "+ + "delivered to peer: %v", string(pubKey)) + case <-peer.quit: + srvrLog.Errorf("unable to deliver local close "+ + "channel request to peer %v, err: %v", + string(pubKey), err) + } }, UpdateTopology: func(msg *lnwire.ChannelUpdate) error { s.discoverSrv.ProcessRemoteAnnouncement(msg, nil) @@ -276,10 +264,10 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, } s.discoverSrv, err = discovery.New(discovery.Config{ - Broadcast: s.broadcastMessage, + Broadcast: s.BroadcastMessage, Notifier: s.cc.chainNotifier, Router: s.chanRouter, - SendToPeer: s.sendToPeer, + SendToPeer: s.SendToPeer, TrickleDelay: time.Millisecond * 300, ProofMatureDelta: 0, DB: chanDB, @@ -299,12 +287,12 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, // incoming connections cmgr, err := connmgr.New(&connmgr.Config{ Listeners: listeners, - OnAccept: s.inboundPeerConnected, + OnAccept: s.InboundPeerConnected, RetryDuration: time.Second * 5, TargetOutbound: 100, GetNewAddress: nil, Dial: noiseDial(s.identityPriv), - OnConnection: s.outboundPeerConnected, + OnConnection: s.OutboundPeerConnected, }) if err != nil { return nil, err @@ -315,12 +303,14 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, } // Started returns true if the server has been started, and false otherwise. +// NOTE: This function is safe for concurrent access. func (s *server) Started() bool { return atomic.LoadInt32(&s.started) != 0 } // Start starts the main daemon server, all requested listeners, and any helper // goroutines. +// NOTE: This function is safe for concurrent access. func (s *server) Start() error { // Already running? if !atomic.CompareAndSwapInt32(&s.started, 0, 1) { @@ -352,9 +342,6 @@ func (s *server) Start() error { return err } - s.wg.Add(1) - go s.queryHandler() - // With all the relevant sub-systems started, we'll now attempt to // establish persistent connections to our direct channel collaborators // within the network. @@ -362,12 +349,15 @@ func (s *server) Start() error { return err } + go s.connMgr.Start() + return nil } // Stop gracefully shutsdown the main daemon server. This function will signal // any active goroutines, or helper objects to exit, then blocks until they've // all successfully exited. Additionally, any/all listeners are closed. +// NOTE: This function is safe for concurrent access. func (s *server) Stop() error { // Bail if we're already shutting down. if !atomic.CompareAndSwapInt32(&s.shutdown, 0, 1) { @@ -383,20 +373,40 @@ func (s *server) Stop() error { s.discoverSrv.Stop() s.cc.wallet.Shutdown() s.cc.chainView.Stop() + s.connMgr.Stop() - // Signal all the lingering goroutines to quit. - close(s.quit) + // Disconnect from each active peers to ensure that + // peerTerminationWatchers signal completion to each peer. + peers := s.Peers() + for _, peer := range peers { + s.DisconnectPeer(peer.addr.IdentityKey) + } + + // Wait for all lingering goroutines to quit. s.wg.Wait() return nil } +// Stopped returns true if the server has been instructed to shutdown. +// NOTE: This function is safe for concurrent access. +func (s *server) Stopped() bool { + return atomic.LoadInt32(&s.shutdown) != 0 +} + +// WaitForShutdown blocks until all goroutines have been stopped. +func (s *server) WaitForShutdown() { + s.wg.Wait() +} + // genNodeAnnouncement generates and returns the current fully signed node // announcement. If refresh is true, then the time stamp of the announcement // will be updated in order to ensure it propagates through the network. -func (s *server) genNodeAnnouncement(refresh bool) (lnwire.NodeAnnouncement, error) { - s.annMtx.Lock() - defer s.annMtx.Unlock() +func (s *server) genNodeAnnouncement( + refresh bool) (lnwire.NodeAnnouncement, error) { + + s.mu.Lock() + defer s.mu.Unlock() if !refresh { return *s.currentNodeAnn, nil @@ -460,8 +470,10 @@ func (s *server) establishPersistentConnections() error { } // TODO(roasbeef): instead iterate over link nodes and query graph for // each of the nodes. - err = sourceNode.ForEachChannel(nil, func(_ *bolt.Tx, - _ *channeldb.ChannelEdgeInfo, policy *channeldb.ChannelEdgePolicy) error { + err = sourceNode.ForEachChannel(nil, func( + _ *bolt.Tx, + _ *channeldb.ChannelEdgeInfo, + policy *channeldb.ChannelEdgePolicy) error { pubStr := string(policy.Node.PubKey.SerializeCompressed()) @@ -531,10 +543,8 @@ func (s *server) establishPersistentConnections() error { Permanent: true, } - s.pendingConnMtx.Lock() - s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], - connReq) - s.pendingConnMtx.Unlock() + s.persistentConnReqs[pubStr] = append( + s.persistentConnReqs[pubStr], connReq) go s.connMgr.Connect(connReq) } @@ -543,54 +553,56 @@ func (s *server) establishPersistentConnections() error { return nil } -// WaitForShutdown blocks all goroutines have been stopped. -func (s *server) WaitForShutdown() { - s.wg.Wait() -} - -// broadcastReq is a message sent to the server by a related subsystem when it -// wishes to broadcast one or more messages to all connected peers. Thi -type broadcastReq struct { - ignore *btcec.PublicKey - msgs []lnwire.Message - - errChan chan error // MUST be buffered. -} - -// broadcastMessage sends a request to the server to broadcast a set of +// BroadcastMessage sends a request to the server to broadcast a set of // messages to all peers other than the one specified by the `skip` parameter. -func (s *server) broadcastMessage(skip *btcec.PublicKey, msgs ...lnwire.Message) error { - errChan := make(chan error, 1) +// NOTE: This function is safe for concurrent access. +func (s *server) BroadcastMessage( + skip *btcec.PublicKey, + msgs ...lnwire.Message) error { msgsToSend := make([]lnwire.Message, 0, len(msgs)) msgsToSend = append(msgsToSend, msgs...) - broadcastReq := &broadcastReq{ - ignore: skip, - msgs: msgsToSend, - errChan: errChan, - } - select { - case s.broadcastRequests <- broadcastReq: - case <-s.quit: - return errors.New("server shutting down") - } + s.mu.Lock() + defer s.mu.Unlock() - select { - case err := <-errChan: - return err - case <-s.quit: - return errors.New("server shutting down") - } + return s.broadcastMessages(skip, msgsToSend) } -// sendReq is message sent to the server by a related subsystem which it -// wishes to send a set of messages to a specified peer. -type sendReq struct { - target *btcec.PublicKey - msgs []lnwire.Message +// broadcastMessages is an internal method that delivers messages to all active +// peers except the one specified by `skip`. +// NOTE: This method MUST be called while the server's mutex is locked. +func (s *server) broadcastMessages( + skip *btcec.PublicKey, + msgs []lnwire.Message) error { - errChan chan error + srvrLog.Debugf("Broadcasting %v messages", len(msgs)) + + // Iterate over all known peers, dispatching a go routine to enqueue all + // messages to each of peers. We synchronize access to peersByPub + // throughout this process to ensure we deliver messages to exact set of + // peers present at the time of invocation. + var wg sync.WaitGroup + for _, sPeer := range s.peersByPub { + if skip != nil && + sPeer.addr.IdentityKey.IsEqual(skip) { + + srvrLog.Debugf("Skipping %v in broadcast", + skip.SerializeCompressed()) + + continue + } + + // Dispatch a go routine to enqueue all messages to this peer. + wg.Add(1) + s.wg.Add(1) + go s.sendPeerMessages(sPeer, msgs, &wg) + } + // Wait for all messages to have been dispatched before returning to + // caller. + wg.Wait() + + return nil } type nodeAddresses struct { @@ -598,47 +610,113 @@ type nodeAddresses struct { addresses []*net.TCPAddr } -// sendToPeer send a message to the server telling it to send the specific set +// SendToPeer send a message to the server telling it to send the specific set // of message to a particular peer. If the peer connect be found, then this // method will return a non-nil error. -func (s *server) sendToPeer(target *btcec.PublicKey, msgs ...lnwire.Message) error { - errChan := make(chan error, 1) +// NOTE: This function is safe for concurrent access. +func (s *server) SendToPeer( + target *btcec.PublicKey, + msgs ...lnwire.Message) error { msgsToSend := make([]lnwire.Message, 0, len(msgs)) msgsToSend = append(msgsToSend, msgs...) - sMsg := &sendReq{ - target: target, - msgs: msgsToSend, - errChan: errChan, + + s.mu.Lock() + defer s.mu.Unlock() + + return s.sendToPeer(target, msgsToSend) +} + +// sendToPeer is an internal method that delivers messages to the specified +// `target` peer. +func (s *server) sendToPeer( + target *btcec.PublicKey, + msgs []lnwire.Message) error { + + // Compute the target peer's identifier. + targetPubBytes := target.SerializeCompressed() + + srvrLog.Infof("Attempting to send msgs %v to: %x", + len(msgs), targetPubBytes) + + // Lookup intended target in peersByPub, returning an error to the + // caller if the peer is unknown. Access to peersByPub is synchronized + // here to ensure we consider the exact set of peers present at the time + // of invocation. + targetPeer, ok := s.peersByPub[string(targetPubBytes)] + if !ok { + srvrLog.Errorf("unable to send message to %x, "+ + "peer not found", targetPubBytes) + + return errors.New("peer not found") } - select { - case s.sendRequests <- sMsg: - case <-s.quit: - return errors.New("server shutting down") + s.sendPeerMessages(targetPeer, msgs, nil) + + return nil +} + +// sendPeerMessages enqueues a list of messages into the outgoingQueue of the +// `targetPeer`. This method supports additional broadcast-level +// synchronization by using the additional `wg` to coordinate a particular +// broadcast. +// +// NOTE: This method must be invoked with a non-nil `wg` if it is spawned as a +// go routine--both `wg` and the server's WaitGroup should be incremented +// beforehand. If this method is not spawned as a go routine, the provided `wg` +// should be nil, and the server's WaitGroup should not be tracking this +// invocation. +func (s *server) sendPeerMessages( + targetPeer *peer, + msgs []lnwire.Message, + wg *sync.WaitGroup) { + + // If a WaitGroup is provided, we assume that this method was spawned as + // a go routine, and that it is being tracked by both the server's + // WaitGroup, as well as the broadcast-level WaitGroup `wg`. In this + // event, we defer a call to Done on both WaitGroups to 1) ensure that + // server will be able to shutdown after its go routines exit, and 2) so + // the server can return to the caller of BroadcastMessage. + if wg != nil { + defer s.wg.Done() + defer wg.Done() } - select { - case err := <-errChan: - return err - case <-s.quit: - return errors.New("server shutting down") + for _, msg := range msgs { + targetPeer.queueMsg(msg, nil) } } -// findPeer will return the peer that corresponds to the passed in public key. +// FindPeer will return the peer that corresponds to the passed in public key. // This function is used by the funding manager, allowing it to update the // daemon's local representation of the remote peer. -func (s *server) findPeer(peerKey *btcec.PublicKey) (*peer, error) { +// NOTE: This function is safe for concurrent access. +func (s *server) FindPeer(peerKey *btcec.PublicKey) (*peer, error) { + s.mu.Lock() + defer s.mu.Unlock() + serializedIDKey := string(peerKey.SerializeCompressed()) - s.peersMtx.RLock() - peer := s.peersByPub[serializedIDKey] - s.peersMtx.RUnlock() + return s.findPeer(serializedIDKey) +} +// FindPeerByPubStr will return the peer that corresponds to the passed peerID, +// which should be a string representation of the peer's serialized, compressed +// public key. +// NOTE: This function is safe for concurrent access. +func (s *server) FindPeerByPubStr(peerID string) (*peer, error) { + s.mu.Lock() + defer s.mu.Unlock() + + return s.findPeer(peerID) +} + +// findPeer is an internal method that retrieves the specified peer from the +// server's internal state. +func (s *server) findPeer(peerID string) (*peer, error) { + peer := s.peersByPub[peerID] if peer == nil { - return nil, errors.New("Peer not found. Pubkey: " + - string(peerKey.SerializeCompressed())) + return nil, errors.New("Peer not found. Pubkey: " + peerID) } return peer, nil @@ -648,21 +726,20 @@ func (s *server) findPeer(peerKey *btcec.PublicKey) (*peer, error) { // cleans up all resources allocated to the peer, notifies relevant sub-systems // of its demise, and finally handles re-connecting to the peer if it's // persistent. -// -// NOTE: This MUST be launched as a goroutine. +// NOTE: This MUST be launched as a goroutine AND the _peer's_ WaitGroup should +// be incremented before spawning this method, as it will signal to the peer's +// WaitGroup upon completion. func (s *server) peerTerminationWatcher(p *peer) { + defer p.wg.Done() + p.WaitForDisconnect() srvrLog.Debugf("Peer %v has been disconnected", p) // If the server is exiting then we can bail out early ourselves as all // the other sub-systems will already be shutting down. - select { - case <-s.quit: + if s.Stopped() { return - default: - // If we aren't shutting down, then we'll fall through this - // this empty default case. } // Tell the switch to remove all links associated with this peer. @@ -684,7 +761,7 @@ func (s *server) peerTerminationWatcher(p *peer) { } // Send the peer to be garbage collected by the server. - p.server.donePeers <- p + s.removePeer(p) // If this peer had an active persistent connection request, then we // can remove this as we manually decide below if we should attempt to @@ -695,9 +772,7 @@ func (s *server) peerTerminationWatcher(p *peer) { // Next, check to see if this is a persistent peer or not. pubStr := string(p.addr.IdentityKey.SerializeCompressed()) - s.pendingConnMtx.RLock() _, ok := s.persistentPeers[pubStr] - s.pendingConnMtx.RUnlock() if ok { srvrLog.Debugf("Attempting to re-establish persistent "+ "connection to peer %v", p) @@ -710,7 +785,6 @@ func (s *server) peerTerminationWatcher(p *peer) { Permanent: true, } - s.pendingConnMtx.Lock() // We'll only need to re-launch a connection requests if one // isn't already currently pending. if _, ok := s.persistentConnReqs[pubStr]; ok { @@ -720,9 +794,8 @@ func (s *server) peerTerminationWatcher(p *peer) { // Otherwise, we'll launch a new connection requests in order // to attempt to maintain a persistent connection with this // peer. - s.persistentConnReqs[pubStr] = append(s.persistentConnReqs[pubStr], - connReq) - s.pendingConnMtx.Unlock() + s.persistentConnReqs[pubStr] = append( + s.persistentConnReqs[pubStr], connReq) go s.connMgr.Connect(connReq) } @@ -731,7 +804,11 @@ func (s *server) peerTerminationWatcher(p *peer) { // peerConnected is a function that handles initialization a newly connected // peer by adding it to the server's global list of all active peers, and // starting all the goroutines the peer needs to function properly. -func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound bool) { +func (s *server) peerConnected( + conn net.Conn, + connReq *connmgr.ConnReq, + inbound bool) { + brontideConn := conn.(*brontide.Conn) peerAddr := &lnwire.NetAddress{ IdentityKey: brontideConn.RemotePub(), @@ -757,7 +834,7 @@ func (s *server) peerConnected(conn net.Conn, connReq *connmgr.ConnReq, inbound return } - s.newPeers <- p + s.addPeer(p) } // shouldDropConnection determines if our local connection to a remote peer @@ -770,22 +847,31 @@ func shouldDropLocalConnection(local, remote *btcec.PublicKey) bool { localPubBytes := local.SerializeCompressed() remotePubPbytes := remote.SerializeCompressed() - // The connection that comes from the node with a "smaller" pubkey should - // be kept. Therefore, if our pubkey is "greater" than theirs, we should - // drop our established connection. + // The connection that comes from the node with a "smaller" pubkey + // should be kept. Therefore, if our pubkey is "greater" than theirs, we + // should drop our established connection. return bytes.Compare(localPubBytes, remotePubPbytes) > 0 } -// inboundPeerConnected initializes a new peer in response to a new inbound +// InboundPeerConnected initializes a new peer in response to a new inbound // connection. -func (s *server) inboundPeerConnected(conn net.Conn) { - s.peersMtx.Lock() - defer s.peersMtx.Unlock() +// +// NOTE: This function is safe for concurrent access. +func (s *server) InboundPeerConnected(conn net.Conn) { + // Exit early if we have already been instructed to shutdown, this + // prevents any delayed callbacks from accidentally registering peers. + if s.Stopped() { + return + } + + nodePub := conn.(*brontide.Conn).RemotePub() + pubStr := string(nodePub.SerializeCompressed()) + + s.mu.Lock() + defer s.mu.Unlock() // If we already have an inbound connection to this peer, then ignore // this new connection. - nodePub := conn.(*brontide.Conn).RemotePub() - pubStr := string(nodePub.SerializeCompressed()) if _, ok := s.inboundPeers[pubStr]; ok { srvrLog.Debugf("Ignoring duplicate inbound connection") conn.Close() @@ -817,34 +903,40 @@ func (s *server) inboundPeerConnected(conn net.Conn) { srvrLog.Debugf("Disconnecting stale connection to %v", connectedPeer) connectedPeer.Disconnect(errors.New("remove stale connection")) - s.donePeers <- connectedPeer + + s.removePeer(connectedPeer) } // Next, check to see if we have any outstanding persistent connection // requests to this peer. If so, then we'll remove all of these // connection requests, and also delete the entry from the map. - s.pendingConnMtx.Lock() if connReqs, ok := s.persistentConnReqs[pubStr]; ok { for _, connReq := range connReqs { s.connMgr.Remove(connReq.ID()) } delete(s.persistentConnReqs, pubStr) } - s.pendingConnMtx.Unlock() - go s.peerConnected(conn, nil, false) + s.peerConnected(conn, nil, false) } -// outboundPeerConnected initializes a new peer in response to a new outbound +// OutboundPeerConnected initializes a new peer in response to a new outbound // connection. -func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) { - s.peersMtx.Lock() - defer s.peersMtx.Unlock() +// NOTE: This function is safe for concurrent access. +func (s *server) OutboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) { + // Exit early if we have already been instructed to shutdown, this + // prevents any delayed callbacks from accidentally registering peers. + if s.Stopped() { + return + } localPub := s.identityPriv.PubKey() nodePub := conn.(*brontide.Conn).RemotePub() pubStr := string(nodePub.SerializeCompressed()) + s.mu.Lock() + defer s.mu.Unlock() + // If we already have an outbound connection to this peer, then ignore // this new connection. if _, ok := s.outboundPeers[pubStr]; ok { @@ -864,7 +956,6 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) // As we've just established an outbound connection to this peer, we'll // cancel all other persistent connection requests and eliminate the // entry for this peer from the map. - s.pendingConnMtx.Lock() if connReqs, ok := s.persistentConnReqs[pubStr]; ok { for _, pConnReq := range connReqs { if pConnReq.ID() != connReq.ID() { @@ -873,7 +964,6 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) } delete(s.persistentConnReqs, pubStr) } - s.pendingConnMtx.Unlock() // If we already have an inbound connection from this peer, then we'll // check to see _which_ of our connections should be dropped. @@ -896,10 +986,11 @@ func (s *server) outboundPeerConnected(connReq *connmgr.ConnReq, conn net.Conn) srvrLog.Debugf("Disconnecting stale connection to %v", connectedPeer) connectedPeer.Disconnect(errors.New("remove stale connection")) - s.donePeers <- connectedPeer + + s.removePeer(connectedPeer) } - go s.peerConnected(conn, connReq, true) + s.peerConnected(conn, connReq, true) } // addPeer adds the passed peer to the server's global state of all active @@ -919,7 +1010,6 @@ func (s *server) addPeer(p *peer) { // according to its public key, or it's peer ID. // TODO(roasbeef): pipe all requests through to the // queryHandler/peerManager - s.peersMtx.Lock() pubStr := string(p.addr.IdentityKey.SerializeCompressed()) @@ -932,11 +1022,12 @@ func (s *server) addPeer(p *peer) { s.outboundPeers[pubStr] = p } - s.peersMtx.Unlock() - // Launch a goroutine to watch for the termination of this peer so we // can ensure all resources are properly cleaned up and if need be - // connections are re-established. + // connections are re-established. The go routine is tracked by the + // _peer's_ WaitGroup so that a call to Disconnect will block until the + // `peerTerminationWatcher` has exited. + p.wg.Add(1) go s.peerTerminationWatcher(p) // Once the peer has been added to our indexes, send a message to the @@ -948,15 +1039,12 @@ func (s *server) addPeer(p *peer) { // removePeer removes the passed peer from the server's state of all active // peers. func (s *server) removePeer(p *peer) { - s.peersMtx.Lock() - defer s.peersMtx.Unlock() - - srvrLog.Debugf("removing peer %v", p) - if p == nil { return } + srvrLog.Debugf("removing peer %v", p) + // As the peer is now finished, ensure that the TCP connection is // closed and all of its related goroutines have exited. p.Disconnect(errors.New("remove peer")) @@ -978,24 +1066,6 @@ func (s *server) removePeer(p *peer) { } } -// connectPeerMsg is a message requesting the server to open a connection to a -// particular peer. This message also houses an error channel which will be -// used to report success/failure. -type connectPeerMsg struct { - addr *lnwire.NetAddress - persistent bool - - err chan error -} - -// disconnectPeerMsg is a message requesting the server to disconnect from an -// active peer. -type disconnectPeerMsg struct { - pubKey *btcec.PublicKey - - err chan error -} - // openChanReq is a message sent to the server in order to request the // initiation of a channel funding workflow to the peer with either the // specified relative peer ID, or a global lightning ID. @@ -1016,211 +1086,125 @@ type openChanReq struct { err chan error } -// queryHandler handles any requests to modify the server's internal state of -// all active peers, or query/mutate the server's global state. Additionally, -// any queries directed at peers will be handled by this goroutine. -// -// NOTE: This MUST be run as a goroutine. -func (s *server) queryHandler() { - go s.connMgr.Start() +// ConnectToPeer requests that the server connect to a Lightning Network peer +// at the specified address. This function will *block* until either a +// connection is established, or the initial handshake process fails. +// NOTE: This function is safe for concurrent access. +func (s *server) ConnectToPeer(addr *lnwire.NetAddress, perm bool) error { -out: - for { - select { - // New peers. - case p := <-s.newPeers: - s.addPeer(p) + targetPub := string(addr.IdentityKey.SerializeCompressed()) - // Finished peers. - case p := <-s.donePeers: - s.removePeer(p) - - case bMsg := <-s.broadcastRequests: - ignore := bMsg.ignore - - srvrLog.Debugf("Broadcasting %v messages", len(bMsg.msgs)) - - // Launch a new goroutine to handle the broadcast - // request, this allows us process this request - // asynchronously without blocking subsequent broadcast - // requests. - go func() { - s.peersMtx.RLock() - for _, sPeer := range s.peersByPub { - if ignore != nil && - sPeer.addr.IdentityKey.IsEqual(ignore) { - - srvrLog.Debugf("Skipping %v in broadcast", - ignore.SerializeCompressed()) - - continue - } - - go func(p *peer) { - for _, msg := range bMsg.msgs { - p.queueMsg(msg, nil) - } - }(sPeer) - } - s.peersMtx.RUnlock() - - bMsg.errChan <- nil - }() - case sMsg := <-s.sendRequests: - // TODO(roasbeef): use [33]byte everywhere instead - // * eliminate usage of mutexes, funnel all peer - // mutation to this goroutine - target := sMsg.target.SerializeCompressed() - - srvrLog.Debugf("Attempting to send msgs %v to: %x", - len(sMsg.msgs), target) - - // Launch a new goroutine to handle this send request, - // this allows us process this request asynchronously - // without blocking future send requests. - go func() { - s.peersMtx.RLock() - targetPeer, ok := s.peersByPub[string(target)] - if !ok { - s.peersMtx.RUnlock() - srvrLog.Errorf("unable to send message to %x, "+ - "peer not found", target) - sMsg.errChan <- errors.New("peer not found") - return - } - s.peersMtx.RUnlock() - - sMsg.errChan <- nil - - for _, msg := range sMsg.msgs { - targetPeer.queueMsg(msg, nil) - } - }() - case query := <-s.queries: - switch msg := query.(type) { - case *disconnectPeerMsg: - s.handleDisconnectPeer(msg) - case *connectPeerMsg: - s.handleConnectPeer(msg) - case *openChanReq: - s.handleOpenChanReq(msg) - } - case <-s.quit: - break out - } - } - - s.connMgr.Stop() - - s.wg.Done() -} - -// handleConnectPeer attempts to establish a connection to the address enclosed -// within the passed connectPeerMsg. This function is *async*, a goroutine will -// be spawned in order to finish the request, and respond to the caller. -func (s *server) handleConnectPeer(msg *connectPeerMsg) { - addr := msg.addr - - targetPub := string(msg.addr.IdentityKey.SerializeCompressed()) + // Acquire mutex, but use explicit unlocking instead of defer for better + // granularity. In certain conditions, this method requires making an + // outbound connection to a remote peer, which requires the lock to be + // released, and subsequently reacquired. + s.mu.Lock() // Ensure we're not already connected to this // peer. - s.peersMtx.RLock() peer, ok := s.peersByPub[targetPub] if ok { - s.peersMtx.RUnlock() - msg.err <- fmt.Errorf("already connected to peer: %v", peer) - return + s.mu.Unlock() + + return fmt.Errorf("already connected to peer: %v", peer) } - s.peersMtx.RUnlock() // If there's already a pending connection request for this pubkey, // then we ignore this request to ensure we don't create a redundant // connection. - s.pendingConnMtx.RLock() if _, ok := s.persistentConnReqs[targetPub]; ok { - s.pendingConnMtx.RUnlock() - msg.err <- fmt.Errorf("connection attempt to %v is pending", - addr) - return + s.mu.Unlock() + + return fmt.Errorf("connection attempt to %v is pending", addr) } - s.pendingConnMtx.RUnlock() // If there's not already a pending or active connection to this node, // then instruct the connection manager to attempt to establish a // persistent connection to the peer. srvrLog.Debugf("Connecting to %v", addr) - if msg.persistent { + if perm { connReq := &connmgr.ConnReq{ Addr: addr, Permanent: true, } - s.pendingConnMtx.Lock() s.persistentPeers[targetPub] = struct{}{} - s.persistentConnReqs[targetPub] = append(s.persistentConnReqs[targetPub], - connReq) - s.pendingConnMtx.Unlock() + s.persistentConnReqs[targetPub] = append( + s.persistentConnReqs[targetPub], connReq) + s.mu.Unlock() go s.connMgr.Connect(connReq) - msg.err <- nil - } else { - // If we're not making a persistent connection, then we'll - // attempt to connect o the target peer, returning an error - // which indicates success of failure. - go func() { - // Attempt to connect to the remote node. If the we - // can't make the connection, or the crypto negotiation - // breaks down, then return an error to the caller. - conn, err := brontide.Dial(s.identityPriv, addr) - if err != nil { - msg.err <- err - return - } - s.outboundPeerConnected(nil, conn) - msg.err <- nil - }() + return nil } + s.mu.Unlock() + + // If we're not making a persistent connection, then we'll attempt to + // connect to the target peer. If the we can't make the connection, or + // the crypto negotiation breaks down, then return an error to the + // caller. + conn, err := brontide.Dial(s.identityPriv, addr) + if err != nil { + return err + } + + // Once the connection has been made, we can notify the server of the + // new connection via our public endpoint, which will require the lock + // an add the peer to the server's internal state. + s.OutboundPeerConnected(nil, conn) + + return nil } -// handleDisconnectPeer attempts to disconnect one peer from another -func (s *server) handleDisconnectPeer(msg *disconnectPeerMsg) { - pubBytes := msg.pubKey.SerializeCompressed() +// DisconnectPeer sends the request to server to close the connection with peer +// identified by public key. +// NOTE: This function is safe for concurrent access. +func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { + pubBytes := pubKey.SerializeCompressed() pubStr := string(pubBytes) + s.mu.Lock() + defer s.mu.Unlock() + // Check that were actually connected to this peer. If not, then we'll // exit in an error as we can't disconnect from a peer that we're not // currently connected to. - s.peersMtx.RLock() peer, ok := s.peersByPub[pubStr] - s.peersMtx.RUnlock() if !ok { - msg.err <- fmt.Errorf("unable to find peer %x", pubBytes) - return + return fmt.Errorf("unable to find peer %x", pubBytes) } // If this peer was formerly a persistent connection, then we'll remove // them from this map so we don't attempt to re-connect after we // disconnect. - s.pendingConnMtx.Lock() if _, ok := s.persistentPeers[pubStr]; ok { delete(s.persistentPeers, pubStr) } - s.pendingConnMtx.Unlock() // Now that we know the peer is actually connected, we'll disconnect - // from the peer. + // from the peer. The lock is held until after Disconnect to ensure + // that the peer's `peerTerminationWatcher` has fully exited before + // returning to the caller. srvrLog.Infof("Disconnecting from %v", peer) - peer.Disconnect(errors.New("received user command to disconnect the peer")) + peer.Disconnect( + errors.New("received user command to disconnect the peer"), + ) - msg.err <- nil + return nil } -// handleOpenChanReq first locates the target peer, and if found hands off the -// request to the funding manager allowing it to initiate the channel funding -// workflow. -func (s *server) handleOpenChanReq(req *openChanReq) { +// OpenChannel sends a request to the server to open a channel to the specified +// peer identified by ID with the passed channel funding parameters. +// NOTE: This function is safe for concurrent access. +func (s *server) OpenChannel( + peerID int32, + nodeKey *btcec.PublicKey, + localAmt btcutil.Amount, + pushAmt btcutil.Amount) (chan *lnrpc.OpenStatusUpdate, chan error) { + + updateChan := make(chan *lnrpc.OpenStatusUpdate, 1) + errChan := make(chan error, 1) + var ( targetPeer *peer pubKeyBytes []byte @@ -1229,24 +1213,24 @@ func (s *server) handleOpenChanReq(req *openChanReq) { // If the user is targeting the peer by public key, then we'll need to // convert that into a string for our map. Otherwise, we expect them to // target by peer ID instead. - if req.targetPubkey != nil { - pubKeyBytes = req.targetPubkey.SerializeCompressed() + if nodeKey != nil { + pubKeyBytes = nodeKey.SerializeCompressed() } // First attempt to locate the target peer to open a channel with, if // we're unable to locate the peer then this request will fail. - s.peersMtx.RLock() - if peer, ok := s.peersByID[req.targetPeerID]; ok { + s.mu.Lock() + if peer, ok := s.peersByID[peerID]; ok { targetPeer = peer } else if peer, ok := s.peersByPub[string(pubKeyBytes)]; ok { targetPeer = peer } - s.peersMtx.RUnlock() + s.mu.Unlock() if targetPeer == nil { - req.err <- fmt.Errorf("unable to find peer nodeID(%x), "+ - "peerID(%v)", pubKeyBytes, req.targetPeerID) - return + errChan <- fmt.Errorf("unable to find peer nodeID(%x), "+ + "peerID(%v)", pubKeyBytes, peerID) + return updateChan, errChan } // Spawn a goroutine to send the funding workflow request to the @@ -1255,48 +1239,6 @@ func (s *server) handleOpenChanReq(req *openChanReq) { // synchronous request to the outside world. // TODO(roasbeef): pass in chan that's closed if/when funding succeeds // so can track as persistent peer? - go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) -} - -// ConnectToPeer requests that the server connect to a Lightning Network peer -// at the specified address. This function will *block* until either a -// connection is established, or the initial handshake process fails. -func (s *server) ConnectToPeer(addr *lnwire.NetAddress, - perm bool) error { - - errChan := make(chan error, 1) - - s.queries <- &connectPeerMsg{ - addr: addr, - persistent: perm, - err: errChan, - } - - return <-errChan -} - -// DisconnectPeer sends the request to server to close the connection with peer -// identified by public key. -func (s *server) DisconnectPeer(pubKey *btcec.PublicKey) error { - - errChan := make(chan error, 1) - - s.queries <- &disconnectPeerMsg{ - pubKey: pubKey, - err: errChan, - } - - return <-errChan -} - -// OpenChannel sends a request to the server to open a channel to the specified -// peer identified by ID with the passed channel funding parameters. -func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey, - localAmt, pushAmt btcutil.Amount) (chan *lnrpc.OpenStatusUpdate, chan error) { - - errChan := make(chan error, 1) - updateChan := make(chan *lnrpc.OpenStatusUpdate, 1) - req := &openChanReq{ targetPeerID: peerID, targetPubkey: nodeKey, @@ -1307,21 +1249,21 @@ func (s *server) OpenChannel(peerID int32, nodeKey *btcec.PublicKey, err: errChan, } - s.queries <- req + go s.fundingMgr.initFundingWorkflow(targetPeer.addr, req) return updateChan, errChan } // Peers returns a slice of all active peers. +// NOTE: This function is safe for concurrent access. func (s *server) Peers() []*peer { - s.peersMtx.RLock() + s.mu.Lock() + defer s.mu.Unlock() peers := make([]*peer, 0, len(s.peersByID)) for _, peer := range s.peersByID { peers = append(peers, peer) } - s.peersMtx.RUnlock() - return peers }