diff --git a/lnd_test.go b/lnd_test.go index 0536181f..dd1d5472 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -1991,7 +1991,7 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) { closedChan := graphUpdate.ClosedChans[0] if closedChan.ClosedHeight != uint32(blockHeight+1) { t.Fatalf("close heights of channel mismatch: expected "+ - "%v, got v", blockHeight+1, closedChan.ClosedHeight) + "%v, got %v", blockHeight+1, closedChan.ClosedHeight) } if !bytes.Equal(closedChan.ChanPoint.FundingTxid, chanPoint.FundingTxid) { diff --git a/networktest.go b/networktest.go index 3241acba..04e21b1b 100644 --- a/networktest.go +++ b/networktest.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/hex" "fmt" "io" @@ -19,8 +20,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/grpclog" - "bytes" - "os/exec" "github.com/go-errors/errors" @@ -100,6 +99,11 @@ type lightningNode struct { extraArgs []string + chanWatchRequests chan *chanWatchRequest + + quit chan struct{} + wg sync.WaitGroup + lnrpc.LightningClient } @@ -129,13 +133,15 @@ func newLightningNode(rpcConfig *btcrpcclient.ConnConfig, lndArgs []string) (*li numActiveNodes++ return &lightningNode{ - cfg: cfg, - p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), - rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), - rpcCert: rpcConfig.Certificates, - nodeID: nodeNum, - processExit: make(chan struct{}), - extraArgs: lndArgs, + cfg: cfg, + p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), + rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), + rpcCert: rpcConfig.Certificates, + nodeID: nodeNum, + chanWatchRequests: make(chan *chanWatchRequest), + processExit: make(chan struct{}), + quit: make(chan struct{}), + extraArgs: lndArgs, }, nil } @@ -162,10 +168,10 @@ func (l *lightningNode) genArgs() []string { return args } -// start launches a new process running lnd. Additionally, the PID of the +// Start launches a new process running lnd. Additionally, the PID of the // launched process is saved in order to possibly kill the process forcibly // later. -func (l *lightningNode) start(lndError chan error) error { +func (l *lightningNode) Start(lndError chan error) error { args := l.genArgs() l.cmd = exec.Command("lnd", args...) @@ -231,6 +237,11 @@ func (l *lightningNode) start(lndError chan error) error { } copy(l.PubKey[:], pubkey) + // Launch the watcher that'll hook into graph related topology change + // from the PoV of this node. + l.wg.Add(1) + go l.lightningNetworkWatcher() + return nil } @@ -250,8 +261,8 @@ func (l *lightningNode) cleanup() error { return err } -// stop attempts to stop the active lnd process. -func (l *lightningNode) stop() error { +// Stop attempts to stop the active lnd process. +func (l *lightningNode) Stop() error { // We should skip node stop in case: // - start of the node wasn't initiated // - process wasn't spawned @@ -261,6 +272,9 @@ func (l *lightningNode) stop() error { case <-l.processExit: return nil default: + close(l.quit) + l.wg.Wait() + if runtime.GOOS == "windows" { return l.cmd.Process.Signal(os.Kill) } @@ -268,20 +282,22 @@ func (l *lightningNode) stop() error { } } -// restart attempts to restart a lightning node by shutting it down cleanly, +// Restart attempts to restart a lightning node by shutting it down cleanly, // then restarting the process. This function is fully blocking. Upon restart, // the RPC connection to the node will be re-attempted, continuing iff the // connection attempt is successful. Additionally, if a callback is passed, the // closure will be executed after the node has been shutdown, but before the // process has been started up again. -func (l *lightningNode) restart(errChan chan error, callback func() error) error { - if err := l.stop(); err != nil { +func (l *lightningNode) Restart(errChan chan error, callback func() error) error { + if err := l.Stop(); err != nil { return nil } <-l.processExit l.processExit = make(chan struct{}) + l.quit = make(chan struct{}) + l.wg = sync.WaitGroup{} if callback != nil { if err := callback(); err != nil { @@ -289,13 +305,13 @@ func (l *lightningNode) restart(errChan chan error, callback func() error) error } } - return l.start(errChan) + return l.Start(errChan) } -// shutdown stops the active lnd process and clean up any temporary directories +// Shutdown stops the active lnd process and clean up any temporary directories // created along the way. -func (l *lightningNode) shutdown() error { - if err := l.stop(); err != nil { +func (l *lightningNode) Shutdown() error { + if err := l.Stop(); err != nil { return err } if err := l.cleanup(); err != nil { @@ -304,6 +320,220 @@ func (l *lightningNode) shutdown() error { return nil } +// closeChanWatchRequest is a request to the lightningNetworkWatcher to be +// notified once it's detected within the test Lightning Network, that a +// channel has either been added or closed. +type chanWatchRequest struct { + chanPoint wire.OutPoint + + chanOpen bool + + eventChan chan struct{} +} + +// lightningNetworkWatcher is a goroutine which is able to dispatch +// notifications once it has been observed that a target channel has been +// closed or opened within the network. In order to dispatch these +// notifications, the GraphTopologySubscription client exposed as part of the +// gRPC interface is used. +func (l *lightningNode) lightningNetworkWatcher() { + defer l.wg.Done() + + graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) + go func() { + ctxb := context.Background() + req := &lnrpc.GraphTopologySubscription{} + topologyClient, err := l.SubscribeChannelGraph(ctxb, req) + if err != nil { + // We panic here in case of an error as failure to + // create the topology client will cause all subsequent + // tests to fail. + panic(fmt.Errorf("unable to create topology "+ + "client: %v", err)) + } + + for { + update, err := topologyClient.Recv() + if err == io.EOF { + return + } else if err != nil { + // Similar to the case above, we also panic + // here (and end the tests) as these + // notifications are critical to the success of + // many tests. + panic(fmt.Errorf("unable read update ntfn: %v", err)) + } + + graphUpdates <- update + } + }() + + // For each outpoint, we'll track an integer which denotes the number + // of edges seen for that channel within the network. When this number + // reaches 2, then it means that both edge advertisements has + // propagated through the network. + openChans := make(map[wire.OutPoint]int) + openClients := make(map[wire.OutPoint][]chan struct{}) + + closedChans := make(map[wire.OutPoint]struct{}) + closeClients := make(map[wire.OutPoint][]chan struct{}) + + for { + select { + + // A new graph update has just been received, so we'll examine + // the current set of registered clients to see if we can + // dispatch any requests. + case graphUpdate := <-graphUpdates: + // For each new channel, we'll increment the number of + // edges seen by one. + for _, newChan := range graphUpdate.ChannelUpdates { + txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: newChan.ChanPoint.OutputIndex, + } + openChans[op]++ + + // For this new channel, if the number of edges + // seen is less than two, then the channel + // hasn't been fully announced yet. + if numEdges := openChans[op]; numEdges < 2 { + continue + } + + // Otherwise, we'll notify all the registered + // clients and remove the dispatched clients. + for _, eventChan := range openClients[op] { + close(eventChan) + } + delete(openClients, op) + } + + // For each channel closed, we'll mark that we've + // detected a channel closure while lnd was pruning the + // channel graph. + for _, closedChan := range graphUpdate.ClosedChans { + txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: closedChan.ChanPoint.OutputIndex, + } + closedChans[op] = struct{}{} + + // As the channel has been closed, we'll notify + // all register clients. + for _, eventChan := range closeClients[op] { + close(eventChan) + } + delete(closeClients, op) + } + + // A new watch request, has just arrived. We'll either be able + // to dispatch immediately, or need to add the client for + // processing later. + case watchRequest := <-l.chanWatchRequests: + targetChan := watchRequest.chanPoint + + // TODO(roasbeef): add update type also, checks for + // multiple of 2 + if watchRequest.chanOpen { + // If this is a open request, then it can be + // dispatched if the number of edges seen for + // the channel is at least two. + if numEdges, _ := openChans[targetChan]; numEdges >= 2 { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of + // watch open clients for this out point. + openClients[targetChan] = append(openClients[targetChan], + watchRequest.eventChan) + continue + } + + // If this is a close request, then it can be + // immediately dispatched if we've already seen a + // channel closure for this channel. + if _, ok := closedChans[targetChan]; ok { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of close watch + // clients for this out point. + closeClients[targetChan] = append(closeClients[targetChan], + watchRequest.eventChan) + + case <-l.quit: + return + } + } +} + +// WaitForNetworkChannelOpen will block until a channel with the target +// outpoint is seen as being fully advertised within the network. A channel is +// considered "fully advertised" once both of its directional edges has been +// advertised within the test Lightning Network. +func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + l.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: true, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not opened before timeout") + } +} + +// WaitForNetworkChannelClose will block until a channel with the target +// outpoint is seen as closed within the network. A channel is considered +// closed once a transaction spending the funding outpoint is seen within a +// confirmed block. +func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + l.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: false, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not closed before timeout") + } +} + // networkHarness is an integration testing harness for the lightning network. // The harness by default is created with two active nodes on the network: // Alice and Bob. @@ -322,8 +552,6 @@ type networkHarness struct { seenTxns chan chainhash.Hash bitcoinWatchRequests chan *txWatchRequest - chanWatchRequests chan *chanWatchRequest - // Channel for transmitting stderr output from failed lightning node // to main process. lndErrorChan chan error @@ -340,7 +568,6 @@ func newNetworkHarness() (*networkHarness, error) { activeNodes: make(map[int]*lightningNode), seenTxns: make(chan chainhash.Hash), bitcoinWatchRequests: make(chan *txWatchRequest), - chanWatchRequests: make(chan *chanWatchRequest), lndErrorChan: make(chan error), }, nil } @@ -406,7 +633,7 @@ func (n *networkHarness) SetUp() error { go func() { var err error defer wg.Done() - if err = n.Alice.start(n.lndErrorChan); err != nil { + if err = n.Alice.Start(n.lndErrorChan); err != nil { errChan <- err return } @@ -414,7 +641,7 @@ func (n *networkHarness) SetUp() error { go func() { var err error defer wg.Done() - if err = n.Bob.start(n.lndErrorChan); err != nil { + if err = n.Bob.Start(n.lndErrorChan); err != nil { errChan <- err return } @@ -505,7 +732,7 @@ out: // TearDownAll tears down all active nodes within the test lightning network. func (n *networkHarness) TearDownAll() error { for _, node := range n.activeNodes { - if err := node.shutdown(); err != nil { + if err := node.Shutdown(); err != nil { return err } } @@ -525,7 +752,7 @@ func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { return nil, err } - if err := node.start(n.lndErrorChan); err != nil { + if err := node.Start(n.lndErrorChan); err != nil { return nil, err } @@ -563,7 +790,7 @@ func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) // and invalidated prior state, or persistent state recovery, simulating node // crashes, etc. func (n *networkHarness) RestartNode(node *lightningNode, callback func() error) error { - return node.restart(n.lndErrorChan, callback) + return node.Restart(n.lndErrorChan, callback) } // TODO(roasbeef): add a WithChannel higher-order function? @@ -651,186 +878,6 @@ func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash. } } -// closeChanWatchRequest is a request to the lightningNetworkWatcher to be -// notified once it's detected within the test Lightning Network, that a -// channel has either been added or closed. -type chanWatchRequest struct { - chanPoint wire.OutPoint - - chanOpen bool - - eventChan chan struct{} -} - -// lightningNetworkWatcher is a goroutine which is able to dispatch -// notifications once it has been observed that a target channel has been -// closed or opened within the network. In order to dispatch these -// notifications, the GraphTopologySubscription client exposed as part of the -// gRPC interface is used. -// -// TODO(roasbeef): allow caller to select target nodes to recv ntfn from? -func (n *networkHarness) lightningNetworkWatcher() { - graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) - go func() { - ctxb := context.Background() - req := &lnrpc.GraphTopologySubscription{} - topologyClient, err := n.Alice.SubscribeChannelGraph(ctxb, req) - if err != nil { - // We panic here in case of an error as failure to - // create the topology client will cause all subsequent - // tests to fail. - panic(fmt.Errorf("unable to create topology "+ - "client: %v", err)) - } - - graphUpdate, err := topologyClient.Recv() - if err == io.EOF { - return - } else if err != nil { - } - - graphUpdates <- graphUpdate - }() - - // For each outpoint, we'll track an integer which denotes the number - // of edges seen for that channel within the network. When this number - // reaches 2, then it means that both edge advertisements has - // propagated through the network. - openChans := make(map[wire.OutPoint]int) - openClients := make(map[wire.OutPoint][]chan struct{}) - - closedChans := make(map[wire.OutPoint]struct{}) - closeClients := make(map[wire.OutPoint][]chan struct{}) - - for { - select { - - // A new graph update has just been received, so we'll examine - // the current set of registered clients to see if we can - // dispatch any requests. - case graphUpdate := <-graphUpdates: - - // For each new channel, we'll increment the number of - // edges seen by one. - for _, newChan := range graphUpdate.ChannelUpdates { - txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: newChan.ChanPoint.OutputIndex, - } - openChans[op] += 1 - } - - // For each channel closed, we'll mark that we've - // detected a channel closure while lnd was pruning the - // channel graph. - for _, closedChan := range graphUpdate.ClosedChans { - txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: closedChan.ChanPoint.OutputIndex, - } - closedChans[op] = struct{}{} - } - - // A new watch request, has just arrived. We'll either be able - // to dispatch immediately, or need to add the client for - // processing later. - case watchRequest := <-n.chanWatchRequests: - targetChan := watchRequest.chanPoint - - if watchRequest.chanOpen { - // If this is a open request, then it can be - // dispatched if the number of edges seen for - // the channel is at least two. - if numEdges, _ := openChans[targetChan]; numEdges >= 2 { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of - // watch open clients for this out point. - openClients[targetChan] = append(openClients[targetChan], - watchRequest.eventChan) - } - - // If this is a close request, then it can be - // immediately dispatched if we've already seen a - // channel closure for this channel. - if _, ok := closedChans[targetChan]; ok { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of close watch - // clients for this out point. - closeClients[targetChan] = append(closeClients[targetChan], - watchRequest.eventChan) - } - - } -} - -// WaitForNetworkChannelOpen will block until a channel with the target -// outpoint is seen as being fully advertised within the network. A channel is -// considered "fully advertised" once both of its directional edges has been -// advertised within the test Lightning Network. -func (n *networkHarness) WaitForNetworkChannelOpen(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - n.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - chanOpen: true, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not opened before timeout") - } -} - -// WaitForNetworkChannelClose will block until a channel with the target -// outpoint is seen as closed within the network. A channel is considered -// closed once a transaction spending the funding outpoint is seen within a -// confirmed block. -func (n *networkHarness) WaitForNetworkChannelClose(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - n.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - chanOpen: false, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not closed before timeout") - } -} - // OpenChannel attempts to open a channel between srcNode and destNode with the // passed channel funding parameters. If the passed context has a timeout, then // if the timeout is reached before the channel pending notification is diff --git a/rpcserver.go b/rpcserver.go index 9839b3c1..7d8d4d82 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1722,8 +1722,6 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription, return nil } } - - return nil } // marshallTopologyChange performs a mapping from the topology change sturct