From a179a3adbb71c76c5ee8cb7d97b8445e9fb1663a Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 14 Mar 2017 15:38:04 -0700 Subject: [PATCH] test: modify new network announcement hook to be in node level MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit modifies the two newly added network announcement hook stop be at the lightningNode level rather than on the level of the entire test framework. With this, callers are now able to better utilize the newly added RPC’s since they can target particular peers and wait for network messages to be processed rather then depending on a single node (Alice) for information about the announcements propagated within the network. --- lnd_test.go | 2 +- networktest.go | 463 +++++++++++++++++++++++++++---------------------- rpcserver.go | 2 - 3 files changed, 256 insertions(+), 211 deletions(-) diff --git a/lnd_test.go b/lnd_test.go index 0536181f..dd1d5472 100644 --- a/lnd_test.go +++ b/lnd_test.go @@ -1991,7 +1991,7 @@ func testGraphTopologyNotifications(net *networkHarness, t *harnessTest) { closedChan := graphUpdate.ClosedChans[0] if closedChan.ClosedHeight != uint32(blockHeight+1) { t.Fatalf("close heights of channel mismatch: expected "+ - "%v, got v", blockHeight+1, closedChan.ClosedHeight) + "%v, got %v", blockHeight+1, closedChan.ClosedHeight) } if !bytes.Equal(closedChan.ChanPoint.FundingTxid, chanPoint.FundingTxid) { diff --git a/networktest.go b/networktest.go index 3241acba..04e21b1b 100644 --- a/networktest.go +++ b/networktest.go @@ -1,6 +1,7 @@ package main import ( + "bytes" "encoding/hex" "fmt" "io" @@ -19,8 +20,6 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/grpclog" - "bytes" - "os/exec" "github.com/go-errors/errors" @@ -100,6 +99,11 @@ type lightningNode struct { extraArgs []string + chanWatchRequests chan *chanWatchRequest + + quit chan struct{} + wg sync.WaitGroup + lnrpc.LightningClient } @@ -129,13 +133,15 @@ func newLightningNode(rpcConfig *btcrpcclient.ConnConfig, lndArgs []string) (*li numActiveNodes++ return &lightningNode{ - cfg: cfg, - p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), - rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), - rpcCert: rpcConfig.Certificates, - nodeID: nodeNum, - processExit: make(chan struct{}), - extraArgs: lndArgs, + cfg: cfg, + p2pAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.PeerPort)), + rpcAddr: net.JoinHostPort("127.0.0.1", strconv.Itoa(cfg.RPCPort)), + rpcCert: rpcConfig.Certificates, + nodeID: nodeNum, + chanWatchRequests: make(chan *chanWatchRequest), + processExit: make(chan struct{}), + quit: make(chan struct{}), + extraArgs: lndArgs, }, nil } @@ -162,10 +168,10 @@ func (l *lightningNode) genArgs() []string { return args } -// start launches a new process running lnd. Additionally, the PID of the +// Start launches a new process running lnd. Additionally, the PID of the // launched process is saved in order to possibly kill the process forcibly // later. -func (l *lightningNode) start(lndError chan error) error { +func (l *lightningNode) Start(lndError chan error) error { args := l.genArgs() l.cmd = exec.Command("lnd", args...) @@ -231,6 +237,11 @@ func (l *lightningNode) start(lndError chan error) error { } copy(l.PubKey[:], pubkey) + // Launch the watcher that'll hook into graph related topology change + // from the PoV of this node. + l.wg.Add(1) + go l.lightningNetworkWatcher() + return nil } @@ -250,8 +261,8 @@ func (l *lightningNode) cleanup() error { return err } -// stop attempts to stop the active lnd process. -func (l *lightningNode) stop() error { +// Stop attempts to stop the active lnd process. +func (l *lightningNode) Stop() error { // We should skip node stop in case: // - start of the node wasn't initiated // - process wasn't spawned @@ -261,6 +272,9 @@ func (l *lightningNode) stop() error { case <-l.processExit: return nil default: + close(l.quit) + l.wg.Wait() + if runtime.GOOS == "windows" { return l.cmd.Process.Signal(os.Kill) } @@ -268,20 +282,22 @@ func (l *lightningNode) stop() error { } } -// restart attempts to restart a lightning node by shutting it down cleanly, +// Restart attempts to restart a lightning node by shutting it down cleanly, // then restarting the process. This function is fully blocking. Upon restart, // the RPC connection to the node will be re-attempted, continuing iff the // connection attempt is successful. Additionally, if a callback is passed, the // closure will be executed after the node has been shutdown, but before the // process has been started up again. -func (l *lightningNode) restart(errChan chan error, callback func() error) error { - if err := l.stop(); err != nil { +func (l *lightningNode) Restart(errChan chan error, callback func() error) error { + if err := l.Stop(); err != nil { return nil } <-l.processExit l.processExit = make(chan struct{}) + l.quit = make(chan struct{}) + l.wg = sync.WaitGroup{} if callback != nil { if err := callback(); err != nil { @@ -289,13 +305,13 @@ func (l *lightningNode) restart(errChan chan error, callback func() error) error } } - return l.start(errChan) + return l.Start(errChan) } -// shutdown stops the active lnd process and clean up any temporary directories +// Shutdown stops the active lnd process and clean up any temporary directories // created along the way. -func (l *lightningNode) shutdown() error { - if err := l.stop(); err != nil { +func (l *lightningNode) Shutdown() error { + if err := l.Stop(); err != nil { return err } if err := l.cleanup(); err != nil { @@ -304,6 +320,220 @@ func (l *lightningNode) shutdown() error { return nil } +// closeChanWatchRequest is a request to the lightningNetworkWatcher to be +// notified once it's detected within the test Lightning Network, that a +// channel has either been added or closed. +type chanWatchRequest struct { + chanPoint wire.OutPoint + + chanOpen bool + + eventChan chan struct{} +} + +// lightningNetworkWatcher is a goroutine which is able to dispatch +// notifications once it has been observed that a target channel has been +// closed or opened within the network. In order to dispatch these +// notifications, the GraphTopologySubscription client exposed as part of the +// gRPC interface is used. +func (l *lightningNode) lightningNetworkWatcher() { + defer l.wg.Done() + + graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) + go func() { + ctxb := context.Background() + req := &lnrpc.GraphTopologySubscription{} + topologyClient, err := l.SubscribeChannelGraph(ctxb, req) + if err != nil { + // We panic here in case of an error as failure to + // create the topology client will cause all subsequent + // tests to fail. + panic(fmt.Errorf("unable to create topology "+ + "client: %v", err)) + } + + for { + update, err := topologyClient.Recv() + if err == io.EOF { + return + } else if err != nil { + // Similar to the case above, we also panic + // here (and end the tests) as these + // notifications are critical to the success of + // many tests. + panic(fmt.Errorf("unable read update ntfn: %v", err)) + } + + graphUpdates <- update + } + }() + + // For each outpoint, we'll track an integer which denotes the number + // of edges seen for that channel within the network. When this number + // reaches 2, then it means that both edge advertisements has + // propagated through the network. + openChans := make(map[wire.OutPoint]int) + openClients := make(map[wire.OutPoint][]chan struct{}) + + closedChans := make(map[wire.OutPoint]struct{}) + closeClients := make(map[wire.OutPoint][]chan struct{}) + + for { + select { + + // A new graph update has just been received, so we'll examine + // the current set of registered clients to see if we can + // dispatch any requests. + case graphUpdate := <-graphUpdates: + // For each new channel, we'll increment the number of + // edges seen by one. + for _, newChan := range graphUpdate.ChannelUpdates { + txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: newChan.ChanPoint.OutputIndex, + } + openChans[op]++ + + // For this new channel, if the number of edges + // seen is less than two, then the channel + // hasn't been fully announced yet. + if numEdges := openChans[op]; numEdges < 2 { + continue + } + + // Otherwise, we'll notify all the registered + // clients and remove the dispatched clients. + for _, eventChan := range openClients[op] { + close(eventChan) + } + delete(openClients, op) + } + + // For each channel closed, we'll mark that we've + // detected a channel closure while lnd was pruning the + // channel graph. + for _, closedChan := range graphUpdate.ClosedChans { + txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) + op := wire.OutPoint{ + Hash: *txid, + Index: closedChan.ChanPoint.OutputIndex, + } + closedChans[op] = struct{}{} + + // As the channel has been closed, we'll notify + // all register clients. + for _, eventChan := range closeClients[op] { + close(eventChan) + } + delete(closeClients, op) + } + + // A new watch request, has just arrived. We'll either be able + // to dispatch immediately, or need to add the client for + // processing later. + case watchRequest := <-l.chanWatchRequests: + targetChan := watchRequest.chanPoint + + // TODO(roasbeef): add update type also, checks for + // multiple of 2 + if watchRequest.chanOpen { + // If this is a open request, then it can be + // dispatched if the number of edges seen for + // the channel is at least two. + if numEdges, _ := openChans[targetChan]; numEdges >= 2 { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of + // watch open clients for this out point. + openClients[targetChan] = append(openClients[targetChan], + watchRequest.eventChan) + continue + } + + // If this is a close request, then it can be + // immediately dispatched if we've already seen a + // channel closure for this channel. + if _, ok := closedChans[targetChan]; ok { + close(watchRequest.eventChan) + continue + } + + // Otherwise, we'll add this to the list of close watch + // clients for this out point. + closeClients[targetChan] = append(closeClients[targetChan], + watchRequest.eventChan) + + case <-l.quit: + return + } + } +} + +// WaitForNetworkChannelOpen will block until a channel with the target +// outpoint is seen as being fully advertised within the network. A channel is +// considered "fully advertised" once both of its directional edges has been +// advertised within the test Lightning Network. +func (l *lightningNode) WaitForNetworkChannelOpen(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + l.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: true, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not opened before timeout") + } +} + +// WaitForNetworkChannelClose will block until a channel with the target +// outpoint is seen as closed within the network. A channel is considered +// closed once a transaction spending the funding outpoint is seen within a +// confirmed block. +func (l *lightningNode) WaitForNetworkChannelClose(ctx context.Context, + op *lnrpc.ChannelPoint) error { + + eventChan := make(chan struct{}) + + txid, err := chainhash.NewHash(op.FundingTxid) + if err != nil { + return err + } + + l.chanWatchRequests <- &chanWatchRequest{ + chanPoint: wire.OutPoint{ + Hash: *txid, + Index: op.OutputIndex, + }, + eventChan: eventChan, + chanOpen: false, + } + + select { + case <-eventChan: + return nil + case <-ctx.Done(): + return fmt.Errorf("channel not closed before timeout") + } +} + // networkHarness is an integration testing harness for the lightning network. // The harness by default is created with two active nodes on the network: // Alice and Bob. @@ -322,8 +552,6 @@ type networkHarness struct { seenTxns chan chainhash.Hash bitcoinWatchRequests chan *txWatchRequest - chanWatchRequests chan *chanWatchRequest - // Channel for transmitting stderr output from failed lightning node // to main process. lndErrorChan chan error @@ -340,7 +568,6 @@ func newNetworkHarness() (*networkHarness, error) { activeNodes: make(map[int]*lightningNode), seenTxns: make(chan chainhash.Hash), bitcoinWatchRequests: make(chan *txWatchRequest), - chanWatchRequests: make(chan *chanWatchRequest), lndErrorChan: make(chan error), }, nil } @@ -406,7 +633,7 @@ func (n *networkHarness) SetUp() error { go func() { var err error defer wg.Done() - if err = n.Alice.start(n.lndErrorChan); err != nil { + if err = n.Alice.Start(n.lndErrorChan); err != nil { errChan <- err return } @@ -414,7 +641,7 @@ func (n *networkHarness) SetUp() error { go func() { var err error defer wg.Done() - if err = n.Bob.start(n.lndErrorChan); err != nil { + if err = n.Bob.Start(n.lndErrorChan); err != nil { errChan <- err return } @@ -505,7 +732,7 @@ out: // TearDownAll tears down all active nodes within the test lightning network. func (n *networkHarness) TearDownAll() error { for _, node := range n.activeNodes { - if err := node.shutdown(); err != nil { + if err := node.Shutdown(); err != nil { return err } } @@ -525,7 +752,7 @@ func (n *networkHarness) NewNode(extraArgs []string) (*lightningNode, error) { return nil, err } - if err := node.start(n.lndErrorChan); err != nil { + if err := node.Start(n.lndErrorChan); err != nil { return nil, err } @@ -563,7 +790,7 @@ func (n *networkHarness) ConnectNodes(ctx context.Context, a, b *lightningNode) // and invalidated prior state, or persistent state recovery, simulating node // crashes, etc. func (n *networkHarness) RestartNode(node *lightningNode, callback func() error) error { - return node.restart(n.lndErrorChan, callback) + return node.Restart(n.lndErrorChan, callback) } // TODO(roasbeef): add a WithChannel higher-order function? @@ -651,186 +878,6 @@ func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid chainhash. } } -// closeChanWatchRequest is a request to the lightningNetworkWatcher to be -// notified once it's detected within the test Lightning Network, that a -// channel has either been added or closed. -type chanWatchRequest struct { - chanPoint wire.OutPoint - - chanOpen bool - - eventChan chan struct{} -} - -// lightningNetworkWatcher is a goroutine which is able to dispatch -// notifications once it has been observed that a target channel has been -// closed or opened within the network. In order to dispatch these -// notifications, the GraphTopologySubscription client exposed as part of the -// gRPC interface is used. -// -// TODO(roasbeef): allow caller to select target nodes to recv ntfn from? -func (n *networkHarness) lightningNetworkWatcher() { - graphUpdates := make(chan *lnrpc.GraphTopologyUpdate) - go func() { - ctxb := context.Background() - req := &lnrpc.GraphTopologySubscription{} - topologyClient, err := n.Alice.SubscribeChannelGraph(ctxb, req) - if err != nil { - // We panic here in case of an error as failure to - // create the topology client will cause all subsequent - // tests to fail. - panic(fmt.Errorf("unable to create topology "+ - "client: %v", err)) - } - - graphUpdate, err := topologyClient.Recv() - if err == io.EOF { - return - } else if err != nil { - } - - graphUpdates <- graphUpdate - }() - - // For each outpoint, we'll track an integer which denotes the number - // of edges seen for that channel within the network. When this number - // reaches 2, then it means that both edge advertisements has - // propagated through the network. - openChans := make(map[wire.OutPoint]int) - openClients := make(map[wire.OutPoint][]chan struct{}) - - closedChans := make(map[wire.OutPoint]struct{}) - closeClients := make(map[wire.OutPoint][]chan struct{}) - - for { - select { - - // A new graph update has just been received, so we'll examine - // the current set of registered clients to see if we can - // dispatch any requests. - case graphUpdate := <-graphUpdates: - - // For each new channel, we'll increment the number of - // edges seen by one. - for _, newChan := range graphUpdate.ChannelUpdates { - txid, _ := chainhash.NewHash(newChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: newChan.ChanPoint.OutputIndex, - } - openChans[op] += 1 - } - - // For each channel closed, we'll mark that we've - // detected a channel closure while lnd was pruning the - // channel graph. - for _, closedChan := range graphUpdate.ClosedChans { - txid, _ := chainhash.NewHash(closedChan.ChanPoint.FundingTxid) - op := wire.OutPoint{ - Hash: *txid, - Index: closedChan.ChanPoint.OutputIndex, - } - closedChans[op] = struct{}{} - } - - // A new watch request, has just arrived. We'll either be able - // to dispatch immediately, or need to add the client for - // processing later. - case watchRequest := <-n.chanWatchRequests: - targetChan := watchRequest.chanPoint - - if watchRequest.chanOpen { - // If this is a open request, then it can be - // dispatched if the number of edges seen for - // the channel is at least two. - if numEdges, _ := openChans[targetChan]; numEdges >= 2 { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of - // watch open clients for this out point. - openClients[targetChan] = append(openClients[targetChan], - watchRequest.eventChan) - } - - // If this is a close request, then it can be - // immediately dispatched if we've already seen a - // channel closure for this channel. - if _, ok := closedChans[targetChan]; ok { - close(watchRequest.eventChan) - continue - } - - // Otherwise, we'll add this to the list of close watch - // clients for this out point. - closeClients[targetChan] = append(closeClients[targetChan], - watchRequest.eventChan) - } - - } -} - -// WaitForNetworkChannelOpen will block until a channel with the target -// outpoint is seen as being fully advertised within the network. A channel is -// considered "fully advertised" once both of its directional edges has been -// advertised within the test Lightning Network. -func (n *networkHarness) WaitForNetworkChannelOpen(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - n.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - chanOpen: true, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not opened before timeout") - } -} - -// WaitForNetworkChannelClose will block until a channel with the target -// outpoint is seen as closed within the network. A channel is considered -// closed once a transaction spending the funding outpoint is seen within a -// confirmed block. -func (n *networkHarness) WaitForNetworkChannelClose(ctx context.Context, - op *lnrpc.ChannelPoint) error { - - eventChan := make(chan struct{}) - - txid, err := chainhash.NewHash(op.FundingTxid) - if err != nil { - return err - } - - n.chanWatchRequests <- &chanWatchRequest{ - chanPoint: wire.OutPoint{ - Hash: *txid, - Index: op.OutputIndex, - }, - chanOpen: false, - } - - select { - case <-eventChan: - return nil - case <-ctx.Done(): - return fmt.Errorf("channel not closed before timeout") - } -} - // OpenChannel attempts to open a channel between srcNode and destNode with the // passed channel funding parameters. If the passed context has a timeout, then // if the timeout is reached before the channel pending notification is diff --git a/rpcserver.go b/rpcserver.go index 9839b3c1..7d8d4d82 100644 --- a/rpcserver.go +++ b/rpcserver.go @@ -1722,8 +1722,6 @@ func (r *rpcServer) SubscribeChannelGraph(req *lnrpc.GraphTopologySubscription, return nil } } - - return nil } // marshallTopologyChange performs a mapping from the topology change sturct