test: use context.WithTimeout to ensure async tests don't block indefinitely

This commit uses the context package’s WithTimeout to ensure tests
which rely on asynchrony behaviors cannot block the execution of the
integration tests for ever. All tests which rely on async gRPC
notifications now perform a 3 way select: on the timeout channel
closure, an error, and the response itself.

Additionally some slight refactoring has taken place within the current
set of integration tests, eliminating some unneeded factory functions.
This commit is contained in:
Olaoluwa Osuntokun 2016-09-17 17:34:39 -07:00
parent b264ba198f
commit 28b72d368c
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 239 additions and 136 deletions

View File

@ -28,81 +28,87 @@ func assertTxInBlock(block *btcutil.Block, txid *wire.ShaHash, t *testing.T) {
t.Fatalf("funding tx was not included in block")
}
// getChannelHelpers returns a series of helper functions as closures which may
// be useful within tests to execute common activities such as synchronously
// waiting for channels to open/close.
func getChannelHelpers(ctxb context.Context, net *networkHarness,
t *testing.T) (func(*lightningNode, *lightningNode, btcutil.Amount) *lnrpc.ChannelPoint,
func(*lightningNode, *lnrpc.ChannelPoint)) {
// openChannelAndAssert attempts to open a channel with the specified
// parameters extended from Alice to Bob. Additionally, two items are asserted
// after the channel is considered open: the funding transactino should be
// found within a block, and that Alice can report the status of the new
// channel.
func openChannelAndAssert(t *testing.T, net *networkHarness, ctx context.Context,
alice, bob *lightningNode, amount btcutil.Amount) *lnrpc.ChannelPoint {
openChannel := func(alice *lightningNode, bob *lightningNode, amount btcutil.Amount) *lnrpc.ChannelPoint {
chanOpenUpdate, err := net.OpenChannel(ctxb, alice, bob, amount, 1)
if err != nil {
t.Fatalf("unable to open channel: %v", err)
}
// Mine a block, then wait for Alice's node to notify us that the
// channel has been opened. The funding transaction should be found
// within the newly mined block.
blockHash, err := net.Miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
}
block, err := net.Miner.Node.GetBlock(blockHash[0])
if err != nil {
t.Fatalf("unable to get block: %v", err)
}
fundingChanPoint, err := net.WaitForChannelOpen(chanOpenUpdate)
if err != nil {
t.Fatalf("error while waiting for channel open: %v", err)
}
fundingTxID, err := wire.NewShaHash(fundingChanPoint.FundingTxid)
if err != nil {
t.Fatalf("unable to create sha hash: %v", err)
}
assertTxInBlock(block, fundingTxID, t)
// The channel should be listed in the peer information returned by
// both peers.
chanPoint := wire.OutPoint{
Hash: *fundingTxID,
Index: fundingChanPoint.OutputIndex,
}
err = net.AssertChannelExists(ctxb, alice, &chanPoint)
if err != nil {
t.Fatalf("unable to assert channel existence: %v", err)
}
return fundingChanPoint
chanOpenUpdate, err := net.OpenChannel(ctx, alice, bob, amount, 1)
if err != nil {
t.Fatalf("unable to open channel: %v", err)
}
closeChannel := func(node *lightningNode, fundingChanPoint *lnrpc.ChannelPoint) {
closeUpdates, err := net.CloseChannel(ctxb, node, fundingChanPoint, false)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
// Finally, generate a single block, wait for the final close status
// update, then ensure that the closing transaction was included in the
// block.
blockHash, err := net.Miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
}
block, err := net.Miner.Node.GetBlock(blockHash[0])
if err != nil {
t.Fatalf("unable to get block: %v", err)
}
closingTxid, err := net.WaitForChannelClose(closeUpdates)
if err != nil {
t.Fatalf("error while waiting for channel close: %v", err)
}
assertTxInBlock(block, closingTxid, t)
// Mine a block, then wait for Alice's node to notify us that the
// channel has been opened. The funding transaction should be found
// within the newly mined block.
blockHash, err := net.Miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
}
block, err := net.Miner.Node.GetBlock(blockHash[0])
if err != nil {
t.Fatalf("unable to get block: %v", err)
}
fundingChanPoint, err := net.WaitForChannelOpen(ctx, chanOpenUpdate)
if err != nil {
t.Fatalf("error while waiting for channel open: %v", err)
}
fundingTxID, err := wire.NewShaHash(fundingChanPoint.FundingTxid)
if err != nil {
t.Fatalf("unable to create sha hash: %v", err)
}
assertTxInBlock(block, fundingTxID, t)
// The channel should be listed in the peer information returned by
// both peers.
chanPoint := wire.OutPoint{
Hash: *fundingTxID,
Index: fundingChanPoint.OutputIndex,
}
err = net.AssertChannelExists(ctx, alice, &chanPoint)
if err != nil {
t.Fatalf("unable to assert channel existence: %v", err)
}
return openChannel, closeChannel
return fundingChanPoint
}
// closeChannelAndAssert attemps to close a channel identified by the passed
// channel point owned by the passed lighting node. A fully blocking channel
// closure is attempted, therefore the passed context should be a child derived
// via timeout from a base parent. Additionally, once the channel has been
// detected as closed, an assertion checks that the transaction is found within
// a block.
func closeChannelAndAssert(t *testing.T, net *networkHarness,
ctx context.Context, node *lightningNode,
fundingChanPoint *lnrpc.ChannelPoint) {
closeUpdates, err := net.CloseChannel(ctx, node, fundingChanPoint, false)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
// Finally, generate a single block, wait for the final close status
// update, then ensure that the closing transaction was included in the
// block.
blockHash, err := net.Miner.Node.Generate(1)
if err != nil {
t.Fatalf("unable to generate block: %v", err)
}
block, err := net.Miner.Node.GetBlock(blockHash[0])
if err != nil {
t.Fatalf("unable to get block: %v", err)
}
closingTxid, err := net.WaitForChannelClose(ctx, closeUpdates)
if err != nil {
t.Fatalf("error while waiting for channel close: %v", err)
}
assertTxInBlock(block, closingTxid, t)
}
// testBasicChannelFunding performs a test exercising expected behavior from a
@ -111,8 +117,8 @@ func getChannelHelpers(ctxb context.Context, net *networkHarness,
// conditions. Finally, the chain itself is checked to ensure the closing
// transaction was mined.
func testBasicChannelFunding(net *networkHarness, t *testing.T) {
timeout := time.Duration(time.Second * 5)
ctxb := context.Background()
openChannel, closeChannel := getChannelHelpers(ctxb, net, t)
chanAmt := btcutil.Amount(btcutil.SatoshiPerBitcoin / 2)
@ -121,19 +127,21 @@ func testBasicChannelFunding(net *networkHarness, t *testing.T) {
// open or an error occurs in the funding process. A series of
// assertions will be executed to ensure the funding process completed
// successfully.
chanPoint := openChannel(net.Alice, net.Bob, chanAmt)
ctxt, _ := context.WithTimeout(ctxb, timeout)
chanPoint := openChannelAndAssert(t, net, ctxt, net.Alice, net.Bob, chanAmt)
// Finally, immediately close the channel. This function will also
// block until the channel is closed and will additionally assert the
// relevant channel closing post conditions.
closeChannel(net.Alice, chanPoint)
ctxt, _ = context.WithTimeout(ctxb, timeout)
closeChannelAndAssert(t, net, ctxt, net.Alice, chanPoint)
}
// testChannelBalance creates a new channel between Alice and Bob, then
// checks channel balance to be equal amount specified while creation of channel.
func testChannelBalance(net *networkHarness, t *testing.T) {
timeout := time.Duration(time.Second * 5)
ctxb := context.Background()
openChannel, closeChannel := getChannelHelpers(ctxb, net, t)
// Creates a helper closure to be used below which asserts the proper
// response to a channel balance RPC.
@ -152,7 +160,8 @@ func testChannelBalance(net *networkHarness, t *testing.T) {
// Open a channel with 0.5 BTC between Alice and Bob, ensuring the
// channel has been opened properly.
amount := btcutil.Amount(btcutil.SatoshiPerBitcoin / 2)
chanPoint := openChannel(net.Alice, net.Bob, amount)
ctxt, _ := context.WithTimeout(ctxb, timeout)
chanPoint := openChannelAndAssert(t, net, ctxt, net.Alice, net.Bob, amount)
// As this is a single funder channel, Alice's balance should be
// exactly 0.5 BTC since now state transitions have taken place yet.
@ -171,7 +180,8 @@ func testChannelBalance(net *networkHarness, t *testing.T) {
// Finally close the channel between Alice and Bob, asserting that the
// channel has been properly closed on-chain.
closeChannel(net.Alice, chanPoint)
ctxt, _ = context.WithTimeout(ctxb, timeout)
closeChannelAndAssert(t, net, ctxt, net.Alice, chanPoint)
}
// testChannelForceClosure performs a test to exercise the behavior of "force"
@ -184,6 +194,7 @@ func testChannelBalance(net *networkHarness, t *testing.T) {
//
// TODO(roabeef): also add an unsettled HTLC before force closing.
func testChannelForceClosure(net *networkHarness, t *testing.T) {
timeout := time.Duration(time.Second * 5)
ctxb := context.Background()
// First establish a channel ween with a capacity of 100k satoshis
@ -198,7 +209,8 @@ func testChannelForceClosure(net *networkHarness, t *testing.T) {
if _, err := net.Miner.Node.Generate(numFundingConfs); err != nil {
t.Fatalf("unable to mine block: %v", err)
}
chanPoint, err := net.WaitForChannelOpen(chanOpenUpdate)
ctxt, _ := context.WithTimeout(ctxb, timeout)
chanPoint, err := net.WaitForChannelOpen(ctxt, chanOpenUpdate)
if err != nil {
t.Fatalf("error while waiting for channel to open: %v", err)
}
@ -217,7 +229,8 @@ func testChannelForceClosure(net *networkHarness, t *testing.T) {
if _, err := net.Miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate block: %v", err)
}
closingTxID, err := net.WaitForChannelClose(closeUpdate)
ctxt, _ = context.WithTimeout(ctxb, timeout)
closingTxID, err := net.WaitForChannelClose(ctxt, closeUpdate)
if err != nil {
t.Fatalf("error while waiting for channel close: %v", err)
}

View File

@ -274,7 +274,6 @@ func newNetworkHarness() (*networkHarness, error) {
}, nil
}
// InitializeSeedNodes initialized alice and bob nodes given an already
// running instance of btcd's rpctest harness and extra command line flags,
// which should be formatted properly - "--arg=value".
@ -421,6 +420,8 @@ out:
bobResp.Balance == expectedBalance {
break out
}
case <-time.After(time.Second * 30):
return fmt.Errorf("balances not synced after deadline")
}
}
@ -502,17 +503,26 @@ func (n *networkHarness) OnTxAccepted(hash *wire.ShaHash, amt btcutil.Amount) {
}()
}
// WaitForTxBroadcast blocks until the target txid is seen on the network.
func (n *networkHarness) WaitForTxBroadcast(txid wire.ShaHash) {
// WaitForTxBroadcast blocks until the target txid is seen on the network. If
// the transaction isn't seen within the network before the passed timeout,
// then an error is returend.
func (n *networkHarness) WaitForTxBroadcast(ctx context.Context, txid wire.ShaHash) error {
eventChan := make(chan struct{})
n.watchRequests <- &watchRequest{txid, eventChan}
<-eventChan
select {
case <-eventChan:
return nil
case <-ctx.Done():
return fmt.Errorf("tx not seen before context timeout")
}
}
// OpenChannel attemps to open a channel between srcNode and destNode with the
// passed channel funding parameters.
// passed channel funding parameters. If the passed context has a timeout, then
// if the timeout is reeached before the channel pending notification is
// received, an error is returned.
func (n *networkHarness) OpenChannel(ctx context.Context,
srcNode, destNode *lightningNode, amt btcutil.Amount,
numConfs uint32) (lnrpc.Lightning_OpenChannelClient, error) {
@ -529,38 +539,73 @@ func (n *networkHarness) OpenChannel(ctx context.Context,
"alice and bob: %v", err)
}
// Consume the "channel pending" update. This waits until the node
// notifies us that the final message in the channel funding workflow
// has been sent to the remote node.
resp, err := respStream.Recv()
if err != nil {
return nil, fmt.Errorf("unable to read rpc resp: %v", err)
}
if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok {
return nil, fmt.Errorf("expected channel pending update, "+
"instead got %v", resp)
}
chanOpen := make(chan struct{})
errChan := make(chan error)
go func() {
// Consume the "channel pending" update. This waits until the node
// notifies us that the final message in the channel funding workflow
// has been sent to the remote node.
resp, err := respStream.Recv()
if err != nil {
errChan <- err
}
if _, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanPending); !ok {
errChan <- fmt.Errorf("expected channel pending update, "+
"instead got %v", resp)
}
return respStream, nil
close(chanOpen)
}()
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout reached before chan pending " +
"update sent")
case err := <-errChan:
return nil, err
case <-chanOpen:
return respStream, nil
}
}
// WaitForChannelOpen waits for a notification that a channel is open by
// consuming a message from the past open channel stream.
func (n *networkHarness) WaitForChannelOpen(openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) {
resp, err := openChanStream.Recv()
if err != nil {
return nil, fmt.Errorf("unable to read rpc resp: %v", err)
}
fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen)
if !ok {
return nil, fmt.Errorf("expected channel open update, instead got %v", resp)
}
// consuming a message from the past open channel stream. If the passed context
// has a timeout, then if the timeout is reached before the channel has been
// opened, then an error is returned.
func (n *networkHarness) WaitForChannelOpen(ctx context.Context,
openChanStream lnrpc.Lightning_OpenChannelClient) (*lnrpc.ChannelPoint, error) {
return fundingResp.ChanOpen.ChannelPoint, nil
errChan := make(chan error)
respChan := make(chan *lnrpc.ChannelPoint)
go func() {
resp, err := openChanStream.Recv()
if err != nil {
errChan <- fmt.Errorf("unable to read rpc resp: %v", err)
}
fundingResp, ok := resp.Update.(*lnrpc.OpenStatusUpdate_ChanOpen)
if !ok {
errChan <- fmt.Errorf("expected channel open update, "+
"instead got %v", resp)
}
respChan <- fundingResp.ChanOpen.ChannelPoint
}()
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout reached while waiting for " +
"channel open")
case err := <-errChan:
return nil, err
case chanPoint := <-respChan:
return chanPoint, nil
}
}
// CloseChannel close channel attempts to close the channel indicated by the
// passed channel point, initiated by the passed lnNode.
// passed channel point, initiated by the passed lnNode. If the passed context
// has a timeout, then if the timeout is reached before the channel close is
// pending, then an error is returned.
func (n *networkHarness) CloseChannel(ctx context.Context,
lnNode *lightningNode, cp *lnrpc.ChannelPoint,
force bool) (lnrpc.Lightning_CloseChannelClient, error) {
@ -574,41 +619,86 @@ func (n *networkHarness) CloseChannel(ctx context.Context,
return nil, fmt.Errorf("unable to close channel: %v", err)
}
// Consume the "channel close" update in order to wait for the closing
// transaction to be broadcast, then wait for the closing tx to be seen
// within the network.
closeResp, err := closeRespStream.Recv()
if err != nil {
return nil, fmt.Errorf("unable to read rpc resp: %v", err)
}
pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending)
if !ok {
return nil, fmt.Errorf("expected close pending update, got %v", pendingClose)
}
closeTxid, err := wire.NewShaHash(pendingClose.ClosePending.Txid)
if err != nil {
return nil, err
}
n.WaitForTxBroadcast(*closeTxid)
errChan := make(chan error)
fin := make(chan struct{})
go func() {
// Consume the "channel close" update in order to wait for the closing
// transaction to be broadcast, then wait for the closing tx to be seen
// within the network.
closeResp, err := closeRespStream.Recv()
if err != nil {
errChan <- err
return
}
pendingClose, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ClosePending)
if !ok {
errChan <- fmt.Errorf("expected channel close update, "+
"instead got %v", pendingClose)
return
}
return closeRespStream, nil
closeTxid, err := wire.NewShaHash(pendingClose.ClosePending.Txid)
if err != nil {
errChan <- err
return
}
if err := n.WaitForTxBroadcast(ctx, *closeTxid); err != nil {
errChan <- err
return
}
close(fin)
}()
// Wait until either the deadline for the context expires, an error
// occurs, or the channel close update is received.
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout reached before channel close " +
"initiated")
case err := <-errChan:
return nil, err
case <-fin:
return closeRespStream, nil
}
}
// WaitForChannelClose waits for a notification from the passed channel close
// stream that the node has deemed the channel has been fully closed.
func (n *networkHarness) WaitForChannelClose(closeChanStream lnrpc.Lightning_CloseChannelClient) (*wire.ShaHash, error) {
// TODO(roasbeef): use passed ctx to set a deadline on amount of time to
// wait.
closeResp, err := closeChanStream.Recv()
if err != nil {
return nil, fmt.Errorf("unable to read rpc resp: %v", err)
}
closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose)
if !ok {
return nil, fmt.Errorf("expected channel open update, instead got %v", closeFin)
}
// stream that the node has deemed the channel has been fully closed. If the
// passed context has a timeout, then if the timeout is reached before the
// notification is received then an error is returned.
func (n *networkHarness) WaitForChannelClose(ctx context.Context,
closeChanStream lnrpc.Lightning_CloseChannelClient) (*wire.ShaHash, error) {
return wire.NewShaHash(closeFin.ChanClose.ClosingTxid)
errChan := make(chan error)
updateChan := make(chan *lnrpc.CloseStatusUpdate_ChanClose)
go func() {
closeResp, err := closeChanStream.Recv()
if err != nil {
errChan <- err
return
}
closeFin, ok := closeResp.Update.(*lnrpc.CloseStatusUpdate_ChanClose)
if !ok {
errChan <- fmt.Errorf("expected channel close update, "+
"instead got %v", closeFin)
return
}
updateChan <- closeFin
}()
// Wait until either the deadline for the context expires, an error
// occurs, or the channel close update is received.
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout reached before update sent")
case err := <-errChan:
return nil, err
case update := <-updateChan:
return wire.NewShaHash(update.ChanClose.ClosingTxid)
}
}
// AssertChannelExists asserts that an active channel identified by