From 9ef7e23384c882a275204b0b9a72399ce734f312 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 21 Jun 2016 11:52:09 -0700 Subject: [PATCH] lnd: implement connectpeer and listpeers RPC calls --- lnrpc/rpc.proto | 68 ++++++++- rpcserver.go | 75 ++++++++-- server.go | 378 +++++++++++++++++++++++++++++++----------------- 3 files changed, 377 insertions(+), 144 deletions(-) diff --git a/lnrpc/rpc.proto b/lnrpc/rpc.proto index 7cbebff4..b2c246c4 100644 --- a/lnrpc/rpc.proto +++ b/lnrpc/rpc.proto @@ -7,13 +7,27 @@ service Lightning { rpc NewAddress(NewAddressRequest) returns (NewAddressResponse); rpc ConnectPeer(ConnectPeerRequest) returns (ConnectPeerResponse); + rpc ListPeers(ListPeersRequest) returns (ListPeersResponse); + + rpc OpenChannel(OpenChannelRequest) returns (OpenChannelResponse); + rpc CloseChannel(CloseChannelRequest) returns (CloseChannelResponse); + rpc WalletBalance(WalletBalanceRequest) returns (WalletBalanceResponse); } +message ChannelPoint { + bytes funding_txid = 1; + uint32 output_index = 2; +} + +message LightningAddress { + string pubKeyHash = 1; + string host = 2; +} + message SendManyRequest { map AddrToAmount = 1; } - message SendManyResponse { string txid = 1; } @@ -32,11 +46,57 @@ message NewAddressResponse { } message ConnectPeerRequest { - string idAtHost = 1; + LightningAddress addr = 1; +} +message ConnectPeerResponse { + int32 peer_id = 1; } -message ConnectPeerResponse { - bytes lnID = 1; +message HTLC { + int64 id = 1; + + int64 amount = 2; + + bytes hash_lock = 3; + + bool to_us = 4; +} + +message ActiveChannel { + bytes funding_txid = 1; + + int64 capacity = 2; + int64 local_balance = 3; + int64 remote_balance = 4; + + int64 unsettled_belance = 5; + repeated HTLC pending_htlcs = 6; + + int64 num_updates = 7; + // TODO(roasbeef): other stuffs +} + +message Peer { + string lightning_id = 1; + int32 peer_id = 2; + string address = 3; + + uint64 bytes_sent = 4; + uint64 bytes_recv = 5; + + int64 sat_sent = 6; + int64 sat_recv = 7; + + bool inbound = 8; + + // TODO(roasbeef): add pending channels + repeated ActiveChannel channels = 9; +} + +message ListPeersRequest {} +message ListPeersResponse { + repeated Peer peers = 1; +} message WalletBalanceRequest { bool witness_only = 1; } diff --git a/rpcserver.go b/rpcserver.go index 614d8a93..ea536cc4 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1,6 +1,7 @@ package main import ( + "encoding/hex" "fmt" "sync" @@ -19,7 +20,7 @@ var ( defaultAccount uint32 = waddrmgr.DefaultAccountNum ) -// rpcServer... +// rpcServer is a gRPC, RPC front end to the lnd daemon. type rpcServer struct { started int32 // To be used atomically. shutdown int32 // To be used atomically. @@ -31,14 +32,17 @@ type rpcServer struct { quit chan struct{} } +// A compile time check to ensure that rpcServer fully implements the +// LightningServer gRPC service. var _ lnrpc.LightningServer = (*rpcServer)(nil) -// newRpcServer... +// newRpcServer creates and returns a new instance of the rpcServer. func newRpcServer(s *server) *rpcServer { return &rpcServer{server: s, quit: make(chan struct{}, 1)} } -// Start... +// Start launches any helper goroutines required for the rpcServer +// to function. func (r *rpcServer) Start() error { if atomic.AddInt32(&r.started, 1) != 1 { return nil @@ -47,7 +51,7 @@ func (r *rpcServer) Start() error { return nil } -// Stop... +// Stop signals any active goroutines for a graceful closure. func (r *rpcServer) Stop() error { if atomic.AddInt32(&r.shutdown, 1) != 1 { return nil @@ -58,8 +62,10 @@ func (r *rpcServer) Stop() error { return nil } -// SendMany... -func (r *rpcServer) SendMany(ctx context.Context, in *lnrpc.SendManyRequest) (*lnrpc.SendManyResponse, error) { +// SendMany handles a request for a transaction create multiple specified +// outputs in parallel. +func (r *rpcServer) SendMany(ctx context.Context, + in *lnrpc.SendManyRequest) (*lnrpc.SendManyResponse, error) { outputs := make([]*wire.TxOut, 0, len(in.AddrToAmount)) for addr, amt := range in.AddrToAmount { @@ -76,6 +82,8 @@ func (r *rpcServer) SendMany(ctx context.Context, in *lnrpc.SendManyRequest) (*l outputs = append(outputs, wire.NewTxOut(amt, pkscript)) } + // Instruct the wallet to create an transaction paying to the specified + // outputs, selecting any coins with at least one confirmation. txid, err := r.server.lnwallet.SendOutputs(outputs, defaultAccount, 1) if err != nil { return nil, err @@ -86,7 +94,7 @@ func (r *rpcServer) SendMany(ctx context.Context, in *lnrpc.SendManyRequest) (*l return &lnrpc.SendManyResponse{Txid: txid.String()}, nil } -// NewAddress... +// NewAddress creates a new address under control of the local wallet. func (r *rpcServer) NewAddress(ctx context.Context, in *lnrpc.NewAddressRequest) (*lnrpc.NewAddressResponse, error) { @@ -115,15 +123,32 @@ func (r *rpcServer) NewAddress(ctx context.Context, return &lnrpc.NewAddressResponse{Address: addr.String()}, nil } -// LNConnect... +// ConnectPeer attempts to establish a connection to a remote peer. func (r *rpcServer) ConnectPeer(ctx context.Context, in *lnrpc.ConnectPeerRequest) (*lnrpc.ConnectPeerResponse, error) { - if len(in.IdAtHost) == 0 { + if in.Addr == nil { return nil, fmt.Errorf("need: lnc pubkeyhash@hostname") } - peerAddr, err := lndc.LnAddrFromString(in.IdAtHost) + idAtHost := fmt.Sprintf("%v@%v", in.Addr.PubKeyHash, in.Addr.Host) + rpcsLog.Debugf("Attempting to connect to peer %v", idAtHost) + + peerAddr, err := lndc.LnAddrFromString(idAtHost) + if err != nil { + rpcsLog.Errorf("(connectpeer): error parsing ln addr: %v", err) + return nil, err + } + + peerID, err := r.server.ConnectToPeer(peerAddr) + if err != nil { + rpcsLog.Errorf("(connectpeer): error connecting to peer: %v", err) + return nil, err + } + + rpcsLog.Debugf("Connected to peer: %v", peerAddr.String()) + return &lnrpc.ConnectPeerResponse{peerID}, nil +} if err != nil { return nil, err } @@ -134,6 +159,36 @@ func (r *rpcServer) ConnectPeer(ctx context.Context, rpcsLog.Infof("Connected to peer: %v", peerAddr.String()) return &lnrpc.ConnectPeerResponse{[]byte(peerAddr.String())}, nil +// ListPeers returns a verbose listing of all currently active peers. +func (r *rpcServer) ListPeers(ctx context.Context, + in *lnrpc.ListPeersRequest) (*lnrpc.ListPeersResponse, error) { + + rpcsLog.Tracef("recieved listpeers request") + + serverPeers := r.server.Peers() + resp := &lnrpc.ListPeersResponse{ + Peers: make([]*lnrpc.Peer, 0, len(serverPeers)), + } + + for _, serverPeer := range serverPeers { + // TODO(roasbeef): add a snapshot method which grabs peer read mtx + peer := &lnrpc.Peer{ + LightningId: hex.EncodeToString(serverPeer.lightningID[:]), + PeerId: serverPeer.id, + Address: serverPeer.conn.RemoteAddr().String(), + Inbound: serverPeer.inbound, + BytesRecv: atomic.LoadUint64(&serverPeer.bytesReceived), + BytesSent: atomic.LoadUint64(&serverPeer.bytesSent), + } + + resp.Peers = append(resp.Peers, peer) + } + + rpcsLog.Tracef("listpeers yielded %v peers", serverPeers) + + return resp, nil +} + // WalletBalance returns the sum of all confirmed unspent outputs under control // by the wallet. This method can be modified by having the request specify // only witness outputs should be factored into the final output sum. diff --git a/server.go b/server.go index fe67771c..d71c4547 100644 --- a/server.go +++ b/server.go @@ -11,23 +11,37 @@ import ( "github.com/lightningnetwork/lnd/lndc" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/btcec" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" "github.com/roasbeef/btcwallet/waddrmgr" ) -// server... +// server is the main server of the Lightning Network Daemon. The server +// houses global state pertianing to the wallet, database, and the rpcserver. +// Additionally, the server is also used as a central messaging bus to interact +// with any of its companion objects. type server struct { started int32 // atomic shutdown int32 // atomic - longTermPriv *btcec.PrivateKey + // identityPriv is the private key used to authenticate any incoming + // connections. + identityPriv *btcec.PrivateKey listeners []net.Listener peers map[int32]*peer + chanIndexMtx sync.RWMutex + chanIndex map[wire.OutPoint]*peer + rpcServer *rpcServer - lnwallet *lnwallet.LightningWallet - chanDB *channeldb.DB + // TODO(roasbeef): add chan notifier also + lnwallet *lnwallet.LightningWallet + + // TODO(roasbeef): add to constructor + fundingMgr *fundingManager + chanDB *channeldb.DB newPeers chan *peer donePeers chan *peer @@ -37,7 +51,8 @@ type server struct { quit chan struct{} } -// newServer... +// newServer creates a new instance of the server which is to listen using the +// passed listener address. func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet, chanDB *channeldb.DB) (*server, error) { @@ -56,12 +71,14 @@ func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet, s := &server{ chanDB: chanDB, - longTermPriv: privKey, + fundingMgr: newFundingManager(wallet), + lnwallet: wallet, + identityPriv: privKey, listeners: listeners, peers: make(map[int32]*peer), + chanIndex: make(map[wire.OutPoint]*peer), newPeers: make(chan *peer, 100), donePeers: make(chan *peer, 100), - lnwallet: wallet, queries: make(chan interface{}), quit: make(chan struct{}), } @@ -71,26 +88,65 @@ func newServer(listenAddrs []string, wallet *lnwallet.LightningWallet, return s, nil } -// addPeer... -func (s *server) addPeer(p *peer) { - if p == nil { +// Start starts the main daemon server, all requested listeners, and any helper +// goroutines. +func (s *server) Start() { + // Already running? + if atomic.AddInt32(&s.started, 1) != 1 { return } - // Ignore new peers if we're shutting down. - if atomic.LoadInt32(&s.shutdown) != 0 { - p.Stop() - return + // Start all the listeners. + for _, l := range s.listeners { + s.wg.Add(1) + go s.listener(l) } - s.peers[p.peerId] = p + s.fundingMgr.Start() + + s.wg.Add(2) + go s.peerManager() + go s.queryHandler() } -// removePeer... -func (s *server) removePeer(p *peer) { +// Stop gracefully shutsdown the main daemon server. This function will signal +// any active goroutines, or helper objects to exit, then blocks until they've +// all successfully exited. Additionally, any/all listeners are closed. +func (s *server) Stop() error { + // Bail if we're already shutting down. + if atomic.AddInt32(&s.shutdown, 1) != 1 { + return nil + } + + // Stop all the listeners. + for _, listener := range s.listeners { + if err := listener.Close(); err != nil { + return err + } + } + + // Shutdown the wallet, funding manager, and the rpc server. + s.rpcServer.Stop() + s.lnwallet.Shutdown() + s.fundingMgr.Stop() + + // Signal all the lingering goroutines to quit. + close(s.quit) + s.wg.Wait() + + return nil } -// peerManager... +// WaitForShutdown blocks all goroutines have been stopped. +func (s *server) WaitForShutdown() { + s.wg.Wait() +} + +// peerManager handles any requests to modify the server's internal state of +// all active peers. Additionally, any queries directed at peers will be +// handled by this goroutine. +// +// NOTE: This MUST be run as a goroutine. func (s *server) peerManager() { out: for { @@ -108,67 +164,68 @@ out: s.wg.Done() } -// connectPeerMsg... -type connectPeerMsg struct { - addr *lndc.LNAdr - reply chan error +// addPeer adds the passed peer to the server's global state of all active +// peers. +func (s *server) addPeer(p *peer) { + if p == nil { + return + } + + // Ignore new peers if we're shutting down. + if atomic.LoadInt32(&s.shutdown) != 0 { + p.Stop() + return + } + + s.peers[p.id] = p } -// queryHandler... +// removePeer removes the passed peer from the server's state of all active +// peers. +func (s *server) removePeer(p *peer) { + if p == nil { + return + } + + // Ignore deleting peers if we're shutting down. + if atomic.LoadInt32(&s.shutdown) != 0 { + p.Stop() + return + } + + delete(s.peers, p.id) +} + +// connectPeerMsg is a message requesting the server to open a connection to a +// particular peer. This message also houses an error channel which will be +// used to report success/failure. +type connectPeerMsg struct { + addr *lndc.LNAdr + resp chan int32 + err chan error +} + +// listPeersMsg is a message sent to the server in order to obtain a listing +// of all currently active channels. +type listPeersMsg struct { + resp chan []*peer +} +// queryHandler is a a goroutine dedicated to handling an queries or requests +// to mutate the server's global state. +// +// NOTE: This MUST be run as a goroutine. func (s *server) queryHandler() { + // TODO(roabeef): consolidate with peerManager out: for { select { case query := <-s.queries: + // TODO(roasbeef): make all goroutines? switch msg := query.(type) { case *connectPeerMsg: - addr := msg.addr - - // Ensure we're not already connected to this - // peer. - for _, peer := range s.peers { - if peer.lightningAddr.String() == - addr.String() { - msg.reply <- fmt.Errorf( - "already connected to peer: %v", - peer.lightningAddr, - ) - } - } - - // Launch a goroutine to connect to the requested - // peer so we can continue to handle queries. - go func() { - // For the lndc crypto handshake, we - // either need a compressed pubkey, or a - // 20-byte pkh. - var remoteId []byte - if addr.PubKey == nil { - remoteId = addr.Base58Adr.ScriptAddress() - } else { - remoteId = addr.PubKey.SerializeCompressed() - } - - // Attempt to connect to the remote - // node. If the we can't make the - // connection, or the crypto negotation - // breaks down, then return an error to the - // caller. - ipAddr := addr.NetAddr.String() - conn := lndc.NewConn(nil) - if err := conn.Dial( - s.longTermPriv, ipAddr, remoteId); err != nil { - msg.reply <- err - } - - // Now that we've established a connection, - // create a peer, and it to the set of - // currently active peers. - peer := newPeer(conn, s) - s.newPeers <- peer - - msg.reply <- nil - }() + s.handleConnectPeer(msg) + case *listPeersMsg: + s.handleListPeers(msg) } case <-s.quit: break out @@ -178,21 +235,113 @@ out: s.wg.Done() } -// ConnectToPeer... -func (s *server) ConnectToPeer(addr *lndc.LNAdr) error { - reply := make(chan error, 1) +// handleListPeers... +func (s *server) handleListPeers(msg *listPeersMsg) { + peers := make([]*peer, 0, len(s.peers)) + for _, peer := range s.peers { + peers = append(peers, peer) + } - s.queries <- &connectPeerMsg{addr, reply} - - return <-reply + msg.resp <- peers } -// AddPeer... -func (s *server) AddPeer(p *peer) { - s.newPeers <- p +// handleConnectPeer attempts to establish a connection to the address enclosed +// within the passed connectPeerMsg. This function is *async*, a goroutine will +// be spawned in order to finish the request, and respond to the caller. +func (s *server) handleConnectPeer(msg *connectPeerMsg) { + addr := msg.addr + + // Ensure we're not already connected to this + // peer. + for _, peer := range s.peers { + if peer.lightningAddr.String() == + addr.String() { + msg.err <- fmt.Errorf( + "already connected to peer: %v", + peer.lightningAddr, + ) + msg.resp <- -1 + } + } + + // Launch a goroutine to connect to the requested + // peer so we can continue to handle queries. + // TODO(roasbeef): semaphore to limit the number of goroutines for + // async requests. + go func() { + // For the lndc crypto handshake, we + // either need a compressed pubkey, or a + // 20-byte pkh. + var remoteId []byte + if addr.PubKey == nil { + remoteId = addr.Base58Adr.ScriptAddress() + } else { + remoteId = addr.PubKey.SerializeCompressed() + } + + srvrLog.Debugf("connecting to %v", hex.EncodeToString(remoteId)) + // Attempt to connect to the remote + // node. If the we can't make the + // connection, or the crypto negotation + // breaks down, then return an error to the + // caller. + ipAddr := addr.NetAddr.String() + conn := lndc.NewConn(nil) + if err := conn.Dial( + s.identityPriv, ipAddr, remoteId); err != nil { + msg.err <- err + msg.resp <- -1 + return + } + + // Now that we've established a connection, + // create a peer, and it to the set of + // currently active peers. + peer, err := newPeer(conn, s, activeNetParams.Net, false) + if err != nil { + srvrLog.Errorf("unable to create peer %v", err) + msg.resp <- -1 + msg.err <- err + return + } + + peer.Start() + s.newPeers <- peer + + msg.resp <- peer.id + msg.err <- nil + }() +} + } + } -// listener... + return + } + + } + +} + + } + + for _, listener := range s.listeners { + } + + +// Peers returns a slice of all active peers. +func (s *server) Peers() []*peer { + resp := make(chan []*peer) + + s.queries <- &listPeersMsg{resp} + + return <-resp +} + +// listener is a goroutine dedicated to accepting in coming peer connections +// from the passed listener. +// +// NOTE: This MUST be run as a goroutine. func (s *server) listener(l net.Listener) { srvrLog.Infof("Server listening on %s", l.Addr()) for atomic.LoadInt32(&s.shutdown) == 0 { @@ -207,67 +356,41 @@ func (s *server) listener(l net.Listener) { } srvrLog.Tracef("New inbound connection from %v", conn.RemoteAddr()) - peer := newPeer(conn, s) - peer.Start() + peer, err := newPeer(conn, s, activeNetParams.Net, true) + if err != nil { + srvrLog.Errorf("unable to create peer: %v", err) + continue + } + peer.Start() s.newPeers <- peer } s.wg.Done() } -// Start... -func (s *server) Start() { - // Already running? - if atomic.AddInt32(&s.started, 1) != 1 { - return - } - - // Start all the listeners. - for _, l := range s.listeners { - s.wg.Add(1) - go s.listener(l) - } - - s.wg.Add(2) - go s.peerManager() - go s.queryHandler() -} - -// Stop... -func (s *server) Stop() error { - // Bail if we're already shutting down. - if atomic.AddInt32(&s.shutdown, 1) != 1 { - return nil - } - - // Stop all the listeners. - for _, listener := range s.listeners { - if err := listener.Close(); err != nil { - return err - } - } - - s.rpcServer.Stop() - s.lnwallet.Shutdown() - - // Signal all the lingering goroutines to quit. - close(s.quit) - return nil -} - // getIdentityPrivKey gets the identity private key out of the wallet DB. -func getIdentityPrivKey(c *channeldb.DB, w *lnwallet.LightningWallet) (*btcec.PrivateKey, error) { +func getIdentityPrivKey(c *channeldb.DB, + w *lnwallet.LightningWallet) (*btcec.PrivateKey, error) { + + // First retrieve the current identity address for this peer. adr, err := c.GetIdAdr() if err != nil { return nil, err } - ltndLog.Infof("got ID address: %s", adr.String()) + + // Using the ID address, request the private key coresponding to the + // address from the wallet's address manager. adr2, err := w.Manager.Address(adr) if err != nil { return nil, err } - ltndLog.Infof("pubkey: %v", hex.EncodeToString(adr2.(waddrmgr.ManagedPubKeyAddress).PubKey().SerializeCompressed())) + + serializedKey := adr2.(waddrmgr.ManagedPubKeyAddress).PubKey().SerializeCompressed() + keyEncoded := hex.EncodeToString(serializedKey) + ltndLog.Infof("identity address: %v", adr) + ltndLog.Infof("identity pubkey retrieved: %v", keyEncoded) + priv, err := adr2.(waddrmgr.ManagedPubKeyAddress).PrivKey() if err != nil { return nil, err @@ -275,8 +398,3 @@ func getIdentityPrivKey(c *channeldb.DB, w *lnwallet.LightningWallet) (*btcec.Pr return priv, nil } - -// WaitForShutdown blocks all goroutines have been stopped. -func (s *server) WaitForShutdown() { - s.wg.Wait() -}