From 5410725306af7c34b0b01a650888b684a04edc89 Mon Sep 17 00:00:00 2001 From: PaddyQuinn Date: Sat, 10 Mar 2018 10:21:10 -0500 Subject: [PATCH 1/3] lnwallet: update comments --- lnwallet/reservation.go | 16 ++++++++-------- lnwallet/wallet.go | 17 +++++++---------- 2 files changed, 15 insertions(+), 18 deletions(-) diff --git a/lnwallet/reservation.go b/lnwallet/reservation.go index 21a365bd..2865c74b 100644 --- a/lnwallet/reservation.go +++ b/lnwallet/reservation.go @@ -58,8 +58,8 @@ type InputScript struct { } // ChannelReservation represents an intent to open a lightning payment channel -// a counterparty. The funding processes from reservation to channel opening is -// a 3-step process. In order to allow for full concurrency during the +// with a counterparty. The funding processes from reservation to channel opening +// is a 3-step process. In order to allow for full concurrency during the // reservation workflow, resources consumed by a contribution are "locked" // themselves. This prevents a number of race conditions such as two funding // transactions double-spending the same input. A reservation can also be @@ -69,12 +69,12 @@ type InputScript struct { // The reservation workflow consists of the following three steps: // 1. lnwallet.InitChannelReservation // * One requests the wallet to allocate the necessary resources for a -// channel reservation. These resources a put in limbo for the lifetime -// of a reservation. -// * Once completed the reservation will have the wallet's contribution -// accessible via the .OurContribution() method. This contribution -// contains the necessary items to allow the remote party to build both -// the funding, and commitment transactions. +// channel reservation. These resources are put in limbo for the lifetime +// of a reservation. +// * Once completed the reservation will have the wallet's contribution +// accessible via the .OurContribution() method. This contribution +// contains the necessary items to allow the remote party to build both +// the funding, and commitment transactions. // 2. ChannelReservation.ProcessContribution/ChannelReservation.ProcessSingleContribution // * The counterparty presents their contribution to the payment channel. // This allows us to build the funding, and commitment transactions diff --git a/lnwallet/wallet.go b/lnwallet/wallet.go index 61f09c2f..f64efc95 100644 --- a/lnwallet/wallet.go +++ b/lnwallet/wallet.go @@ -52,9 +52,8 @@ func (e *ErrInsufficientFunds) Error() string { // will be created in order to track the lifetime of this pending channel. // Outputs selected will be 'locked', making them unavailable, for any other // pending reservations. Therefore, all channels in reservation limbo will be -// periodically after a timeout period in order to avoid "exhaustion" attacks. -// -// TODO(roasbeef): zombie reservation sweeper goroutine. +// periodically timed out after an idle period in order to avoid "exhaustion" +// attacks. type initFundingReserveMsg struct { // chainHash denotes that chain to be used to ultimately open the // target channel. @@ -261,8 +260,6 @@ type LightningWallet struct { fundingLimbo map[uint64]*ChannelReservation nextFundingID uint64 limboMtx sync.RWMutex - // TODO(roasbeef): zombie garbage collection routine to solve - // lost-object/starvation problem/attack. // lockedOutPoints is a set of the currently locked outpoint. This // information is kept in order to provide an easy way to unlock all @@ -366,7 +363,7 @@ func (l *LightningWallet) ActiveReservations() []*ChannelReservation { } // requestHandler is the primary goroutine(s) responsible for handling, and -// dispatching relies to all messages. +// dispatching replies to all messages. func (l *LightningWallet) requestHandler() { out: for { @@ -403,14 +400,14 @@ out: // successful, a ChannelReservation containing our completed contribution is // returned. Our contribution contains all the items necessary to allow the // counterparty to build the funding transaction, and both versions of the -// commitment transaction. Otherwise, an error occurred a nil pointer along with -// an error are returned. +// commitment transaction. Otherwise, an error occurred and a nil pointer along +// with an error are returned. // // Once a ChannelReservation has been obtained, two additional steps must be // processed before a payment channel can be considered 'open'. The second step // validates, and processes the counterparty's channel contribution. The third, // and final step verifies all signatures for the inputs of the funding -// transaction, and that the signature we records for our version of the +// transaction, and that the signature we record for our version of the // commitment transaction is valid. func (l *LightningWallet) InitChannelReservation( capacity, ourFundAmt btcutil.Amount, pushMSat lnwire.MilliSatoshi, @@ -579,7 +576,7 @@ func (l *LightningWallet) handleFundingReserveRequest(req *initFundingReserveMsg reservation.partialState.RevocationProducer = producer reservation.ourContribution.ChannelConstraints = l.Cfg.DefaultConstraints - // TODO(roasbeef): turn above into: initContributio() + // TODO(roasbeef): turn above into: initContribution() // Create a limbo and record entry for this newly pending funding // request. From 75e45b830beab637fe7d844bb9c52f66e919dcf5 Mon Sep 17 00:00:00 2001 From: PaddyQuinn Date: Mon, 12 Mar 2018 21:58:51 -0400 Subject: [PATCH 2/3] funding: implement reservation zombie sweeper Before previous commits were squashed into this commit, zombie reservations were cleaned up individually when they timed out. However, this made the code more complex because each reservation had its own individual timer and thus it would have required the timer being cancelled any time the reservation was cancelled, which would have been harder to maintain. With this commit, zombie reservations are cleaned up by a zombie sweeper that is set off by a ticker instead, to make the code more maintainable. --- fundingmanager.go | 101 ++++++++++++++++++++++++++++++++++++++++++---- lnd.go | 2 + 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 9e29de16..41177d58 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -70,10 +70,29 @@ type reservationWithCtx struct { chanAmt btcutil.Amount + lastUpdated time.Time + updates chan *lnrpc.OpenStatusUpdate err chan error } +// isLocked checks the reservation's timestamp to determine whether it is locked. +func (r *reservationWithCtx) isLocked() bool { + // The time zero value represents a locked reservation. + return r.lastUpdated.IsZero() +} + +// lock locks the reservation from zombie pruning by setting its timestamp to the +// zero value. +func (r *reservationWithCtx) lock() { + r.lastUpdated = time.Time{} +} + +// updateTimestamp updates the reservation's timestamp with the current time. +func (r *reservationWithCtx) updateTimestamp() { + r.lastUpdated = time.Now() +} + // initFundingMsg is sent by an outside subsystem to the funding manager in // order to kick off a funding workflow with a specified target peer. The // original request which defines the parameters of the funding workflow are @@ -266,6 +285,14 @@ type fundingConfig struct { // discovered short channel ID of a formerly pending channel to outside // sub-systems. ReportShortChanID func(wire.OutPoint, lnwire.ShortChannelID) error + + // ZombieSweeperInterval is the periodic time interval in which the zombie + // sweeper is run. + ZombieSweeperInterval time.Duration + + // ReservationTimeout is the length of idle time that must pass before a + // reservation is considered a zombie. + ReservationTimeout time.Duration } // fundingManager acts as an orchestrator/bridge between the wallet's @@ -307,7 +334,7 @@ type fundingManager struct { signedReservations map[lnwire.ChannelID][32]byte // resMtx guards both of the maps above to ensure that all access is - // goroutine stafe. + // goroutine safe. resMtx sync.RWMutex // fundingMsgs is a channel which receives wrapped wire messages @@ -764,6 +791,9 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, func (f *fundingManager) reservationCoordinator() { defer f.wg.Done() + zombieSweepTicker := time.NewTicker(f.cfg.ZombieSweeperInterval) + defer zombieSweepTicker.Stop() + for { select { @@ -786,6 +816,9 @@ func (f *fundingManager) reservationCoordinator() { case req := <-f.fundingRequests: f.handleInitFundingMsg(req) + case <-zombieSweepTicker.C: + f.pruneZombieReservations() + case req := <-f.queries: switch msg := req.(type) { case *pendingChansReq: @@ -941,20 +974,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { amt, msg.PushAmount) // Once the reservation has been created successfully, we add it to - // this peers map of pending reservations to track this particular + // this peer's map of pending reservations to track this particular // reservation until either abort or completion. f.resMtx.Lock() if _, ok := f.activeReservations[peerIDKey]; !ok { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][msg.PendingChannelID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ reservation: reservation, chanAmt: amt, err: make(chan error, 1), peerAddress: fmsg.peerAddress, } + f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the fundingOpenMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt) @@ -1063,6 +1100,9 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { return } + // Update the timestamp once the fundingAcceptMsg has been handled. + defer resCtx.updateTimestamp() + fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID[:]) // We'll also specify the responder's preference for the number of @@ -1304,6 +1344,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { f.localDiscoverySignals[channelID] = make(chan struct{}) f.localDiscoveryMtx.Unlock() + // At this point we have sent our last funding message to the + // initiating peer before the funding transaction will be broadcast. + // The only thing left to do before we can delete this reservation + // is wait for the funding transaction. Lock the reservation so it + // is not pruned by the zombie sweeper. + resCtx.lock() + // With this last message, our job as the responder is now complete. // We'll wait for the funding transaction to reach the specified number // of confirmations, then start normal operations. @@ -1454,6 +1501,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { }, } + // At this point we have broadcast the funding transaction and done all + // necessary processing. The only thing left to do before we can delete + // this reservation is wait for the funding transaction. Lock the + // reservation so it is not pruned by the zombie sweeper. + resCtx.lock() + f.wg.Add(1) go func() { defer f.wg.Done() @@ -2454,15 +2507,19 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][chanID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ chanAmt: capacity, reservation: reservation, peerAddress: msg.peerAddress, updates: msg.updates, err: msg.err, } + f.activeReservations[peerIDKey][chanID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the initFundingMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(capacity) @@ -2544,7 +2601,7 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { } } -// processErrorGeneric sends a message to the fundingManager allowing it to +// processFundingError sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processFundingError(err *lnwire.Error, peerAddress *lnwire.NetAddress) { @@ -2556,7 +2613,7 @@ func (f *fundingManager) processFundingError(err *lnwire.Error, } } -// handleErrorGenericMsg process the error which was received from remote peer, +// handleErrorMsg processes the error which was received from remote peer, // depending on the type of error we should do different clean up steps and // inform the user about it. func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { @@ -2589,7 +2646,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { lnErr.ToGrpcCode(), string(protocolErr.Data), ) } else { - // Otherwise, we'll attempt tto display just the error code + // Otherwise, we'll attempt to display just the error code // itself. resCtx.err <- grpc.Errorf( lnErr.ToGrpcCode(), lnErr.String(), @@ -2602,7 +2659,35 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { } } -// cancelReservationCtx do all needed work in order to securely cancel the +// pruneZombieReservations loops through all pending reservations and fails the +// funding flow for any reservations that have not been updated since the +// ReservationTimeout and are not locked waiting for the funding transaction. +func (f *fundingManager) pruneZombieReservations() { + zombieReservations := make(pendingChannels) + + f.resMtx.RLock() + for _, pendingReservations := range f.activeReservations { + for pendingChanID, resCtx := range pendingReservations { + if resCtx.isLocked() { + continue + } + + if time.Since(resCtx.lastUpdated) > f.cfg.ReservationTimeout { + zombieReservations[pendingChanID] = resCtx + } + } + } + f.resMtx.RUnlock() + + for pendingChanID, resCtx := range zombieReservations { + err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+ + "chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:]) + fndgLog.Warnf(err.Error()) + f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err) + } +} + +// cancelReservationCtx does all needed work in order to securely cancel the // reservation. func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, pendingChanID [32]byte) (*reservationWithCtx, error) { diff --git a/lnd.go b/lnd.go index b2685f90..9b2f5088 100644 --- a/lnd.go +++ b/lnd.go @@ -414,6 +414,8 @@ func lndMain() error { // channel bandwidth. return uint16(lnwallet.MaxHTLCNumber / 2) }, + ZombieSweeperInterval: 1 * time.Minute, + ReservationTimeout: 10 * time.Minute, }) if err != nil { return err From c67f1408eb0750db688461a0470a31bfebd1ecff Mon Sep 17 00:00:00 2001 From: PaddyQuinn Date: Mon, 12 Mar 2018 22:11:40 -0400 Subject: [PATCH 3/3] funding: add zombie sweeper tests Note: This commit also creates a function called assertFundingMsgSent, which replaces checkNodeSendingFundingLocked by doing the same work not just for FundingLocked messages but also for AcceptChannel, FundingCreated, and FundingLocked messages. --- fundingmanager_test.go | 454 ++++++++++++++++++++++++++++++++--------- 1 file changed, 361 insertions(+), 93 deletions(-) diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 3fbc27b4..1c9df88e 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -107,6 +107,7 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, Confirmed: m.oneConfChannel, }, nil } + func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: m.epochChan, @@ -121,6 +122,7 @@ func (m *mockNotifier) Start() error { func (m *mockNotifier) Stop() error { return nil } + func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32) (*chainntnfs.SpendEvent, error) { return &chainntnfs.SpendEvent{ @@ -304,6 +306,8 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, ReportShortChanID: func(wire.OutPoint, lnwire.ShortChannelID) error { return nil }, + ZombieSweeperInterval: 1 * time.Hour, + ReservationTimeout: 1 * time.Nanosecond, }) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) @@ -381,6 +385,8 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { publishChan <- txn return nil }, + ZombieSweeperInterval: oldCfg.ZombieSweeperInterval, + ReservationTimeout: oldCfg.ReservationTimeout, }) if err != nil { t.Fatalf("failed recreating aliceFundingManager: %v", err) @@ -485,68 +491,26 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, // Let Bob handle the init message. bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) - // Bob should answer with an AcceptChannel. - var bobMsg lnwire.Message - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send AcceptChannel message") - } - - acceptChannelResponse, ok := bobMsg.(*lnwire.AcceptChannel) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected AcceptChannel to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected AcceptChannel to be sent from bob, "+ - "instead got %T", bobMsg) - } + // Bob should answer with an AcceptChannel message. + acceptChannelResponse := assertFundingMsgSent( + t, bob.msgChan, "AcceptChannel", + ).(*lnwire.AcceptChannel) // Forward the response to Alice. alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) - // Alice responds with a FundingCreated messages. - select { - case aliceMsg = <-alice.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send FundingCreated message") - } - fundingCreated, ok := aliceMsg.(*lnwire.FundingCreated) - if !ok { - errorMsg, gotError := aliceMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingCreated to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingCreated to be sent from "+ - "alice, instead got %T", aliceMsg) - } + // Alice responds with a FundingCreated message. + fundingCreated := assertFundingMsgSent( + t, alice.msgChan, "FundingCreated", + ).(*lnwire.FundingCreated) // Give the message to Bob. bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr) // Finally, Bob should send the FundingSigned message. - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send FundingSigned message") - } - - fundingSigned, ok := bobMsg.(*lnwire.FundingSigned) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingSigned to be "+ - "sent from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingSigned to be sent from "+ - "bob, instead got %T", bobMsg) - } + fundingSigned := assertFundingMsgSent( + t, bob.msgChan, "FundingSigned", + ).(*lnwire.FundingSigned) // Forward the signature to Alice. alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr) @@ -581,6 +545,81 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, return fundingOutPoint } +func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) { + select { + case <-msgChan: + t.Fatalf("error sent unexpectedly") + case <- time.After(100 * time.Millisecond): + // Expected, return. + } +} + +func assertErrorSent(t *testing.T, msgChan chan lnwire.Message) { + var msg lnwire.Message + select { + case msg = <-msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("node did not send Error message") + } + _, ok := msg.(*lnwire.Error) + if !ok { + t.Fatalf("expected Error to be sent from "+ + "node, instead got %T", msg) + } +} + +func assertFundingMsgSent(t *testing.T, msgChan chan lnwire.Message, + msgType string) lnwire.Message { + var msg lnwire.Message + select { + case msg = <-msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("peer did not send %s message", msgType) + } + + var ( + sentMsg lnwire.Message + ok bool + ) + switch msgType { + case "AcceptChannel": + sentMsg, ok = msg.(*lnwire.AcceptChannel) + case "FundingCreated": + sentMsg, ok = msg.(*lnwire.FundingCreated) + case "FundingSigned": + sentMsg, ok = msg.(*lnwire.FundingSigned) + case "FundingLocked": + sentMsg, ok = msg.(*lnwire.FundingLocked) + default: + t.Fatalf("unknown message type: %s", msgType) + } + + if !ok { + errorMsg, gotError := msg.(*lnwire.Error) + if gotError { + t.Fatalf("expected %s to be sent, instead got error: %v", + msgType, lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected %s to be sent, instead got %T", + msgType, msg) + } + + return sentMsg +} + +func assertNumPendingReservations(t *testing.T, node *testNode, + peerPubKey *btcec.PublicKey, expectedNum int) { + serializedPubKey := newSerializedKey(peerPubKey) + actualNum := len(node.fundingMgr.activeReservations[serializedPubKey]) + if actualNum == expectedNum { + // Success, return. + return + } + + t.Fatalf("Expected node to have %d pending reservations, had %v", + expectedNum, actualNum) +} + func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum int) { var numPendingChans int for i := 0; i < testPollNumTries; i++ { @@ -665,28 +704,6 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode, assertDatabaseState(t, bob, fundingOutPoint, markedOpen) } -func checkNodeSendingFundingLocked(t *testing.T, node *testNode) *lnwire.FundingLocked { - var msg lnwire.Message - select { - case msg = <-node.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("node did not send fundingLocked") - } - - fundingLocked, ok := msg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := msg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from node, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from node, "+ - "instead got %T", msg) - } - return fundingLocked -} - func assertFundingLockedSent(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) @@ -874,7 +891,20 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan, true) - // Notify that transaction was mined + // Make sure both reservations time out and then run both zombie sweepers. + time.Sleep(1 * time.Millisecond) + go alice.fundingMgr.pruneZombieReservations() + go bob.fundingMgr.pruneZombieReservations() + + // Check that neither Alice nor Bob sent an error message. + assertErrorNotSent(t, alice.msgChan) + assertErrorNotSent(t, bob.msgChan) + + // Check that neither reservation has been pruned. + assertNumPendingReservations(t, alice, bobPubKey, 1) + assertNumPendingReservations(t, bob, alicePubKey, 1) + + // Notify that transaction was mined. alice.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{} bob.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{} @@ -885,10 +915,14 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -970,7 +1004,9 @@ func TestFundingManagerRestartBehavior(t *testing.T) { } // Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Alice should still be markedOpen assertDatabaseState(t, alice, fundingOutPoint, markedOpen) @@ -987,7 +1023,9 @@ func TestFundingManagerRestartBehavior(t *testing.T) { return fmt.Errorf("intentional error in SendAnnouncement") } - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // The state should now be fundingLockedSent assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) @@ -1088,7 +1126,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) { } // Bob will send funding locked to Alice - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Alice should still be markedOpen assertDatabaseState(t, alice, fundingOutPoint, markedOpen) @@ -1131,7 +1171,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) { close(con) // This should make Alice send the fundingLocked. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // The state should now be fundingLockedSent assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) @@ -1168,6 +1210,212 @@ func TestFundingManagerOfflinePeer(t *testing.T) { assertNoChannelState(t, alice, bob, fundingOutPoint) } +// TestFundingManagerPeerTimeoutAfterInitFunding checks that the zombie sweeper +// will properly clean up a zombie reservation that times out after the +// initFundingMsg has been handled. +func TestFundingManagerPeerTimeoutAfterInitFunding(t *testing.T) { + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Create a funding request and start the workflow. + errChan := make(chan error, 1) + initReq := &openChanReq{ + targetPubkey: bob.privKey.PubKey(), + chainHash: *activeNetParams.GenesisHash, + localFundingAmt: 500000, + pushAmt: lnwire.NewMSatFromSatoshis(0), + private: false, + updates: updateChan, + err: errChan, + } + + alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + + // Alice should have sent the OpenChannel message to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case err := <-initReq.err: + t.Fatalf("error init funding workflow: %v", err) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenChannel message") + } + + _, ok := aliceMsg.(*lnwire.OpenChannel) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected OpenChannel to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected OpenChannel to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Alice should have a new pending reservation. + assertNumPendingReservations(t, alice, bobPubKey, 1) + + // Make sure Alice's reservation times out and then run her zombie sweeper. + time.Sleep(1 * time.Millisecond) + go alice.fundingMgr.pruneZombieReservations() + + // Alice should have sent an Error message to Bob. + assertErrorSent(t, alice.msgChan) + + // Alice's zombie reservation should have been pruned. + assertNumPendingReservations(t, alice, bobPubKey, 0) +} + +// TestFundingManagerPeerTimeoutAfterFundingOpen checks that the zombie sweeper +// will properly clean up a zombie reservation that times out after the +// fundingOpenMsg has been handled. +func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) { + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Create a funding request and start the workflow. + errChan := make(chan error, 1) + initReq := &openChanReq{ + targetPubkey: bob.privKey.PubKey(), + chainHash: *activeNetParams.GenesisHash, + localFundingAmt: 500000, + pushAmt: lnwire.NewMSatFromSatoshis(0), + private: false, + updates: updateChan, + err: errChan, + } + + alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + + // Alice should have sent the OpenChannel message to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case err := <-initReq.err: + t.Fatalf("error init funding workflow: %v", err) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenChannel message") + } + + openChannelReq, ok := aliceMsg.(*lnwire.OpenChannel) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected OpenChannel to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected OpenChannel to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Alice should have a new pending reservation. + assertNumPendingReservations(t, alice, bobPubKey, 1) + + // Let Bob handle the init message. + bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + + // Bob should answer with an AcceptChannel. + assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") + + // Bob should have a new pending reservation. + assertNumPendingReservations(t, bob, alicePubKey, 1) + + // Make sure Bob's reservation times out and then run his zombie sweeper. + time.Sleep(1 * time.Millisecond) + go bob.fundingMgr.pruneZombieReservations() + + // Bob should have sent an Error message to Alice. + assertErrorSent(t, bob.msgChan) + + // Bob's zombie reservation should have been pruned. + assertNumPendingReservations(t, bob, alicePubKey, 0) +} + +// TestFundingManagerPeerTimeoutAfterFundingAccept checks that the zombie sweeper +// will properly clean up a zombie reservation that times out after the +// fundingAcceptMsg has been handled. +func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Create a funding request and start the workflow. + errChan := make(chan error, 1) + initReq := &openChanReq{ + targetPubkey: bob.privKey.PubKey(), + chainHash: *activeNetParams.GenesisHash, + localFundingAmt: 500000, + pushAmt: lnwire.NewMSatFromSatoshis(0), + private: false, + updates: updateChan, + err: errChan, + } + + alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + + // Alice should have sent the OpenChannel message to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case err := <-initReq.err: + t.Fatalf("error init funding workflow: %v", err) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenChannel message") + } + + openChannelReq, ok := aliceMsg.(*lnwire.OpenChannel) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected OpenChannel to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected OpenChannel to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Alice should have a new pending reservation. + assertNumPendingReservations(t, alice, bobPubKey, 1) + + // Let Bob handle the init message. + bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + + // Bob should answer with an AcceptChannel. + acceptChannelResponse := assertFundingMsgSent( + t, bob.msgChan, "AcceptChannel", + ).(*lnwire.AcceptChannel) + + // Bob should have a new pending reservation. + assertNumPendingReservations(t, bob, alicePubKey, 1) + + // Forward the response to Alice. + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + + // Alice responds with a FundingCreated messages. + assertFundingMsgSent(t, alice.msgChan, "FundingCreated") + + // Make sure Alice's reservation times out and then run her zombie sweeper. + time.Sleep(1 * time.Millisecond) + go alice.fundingMgr.pruneZombieReservations() + + // Alice should have sent an Error message to Bob. + assertErrorSent(t, alice.msgChan) + + // Alice's zombie reservation should have been pruned. + assertNumPendingReservations(t, alice, bobPubKey, 0) +} + func TestFundingManagerFundingTimeout(t *testing.T) { alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1208,7 +1456,7 @@ func TestFundingManagerFundingTimeout(t *testing.T) { } // TestFundingManagerFundingNotTimeoutInitiator checks that if the user was -// the channel initiator, that it does not timeout when the lnd restarts +// the channel initiator, that it does not timeout when the lnd restarts. func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { alice, bob := setupFundingManagers(t) @@ -1297,10 +1545,14 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1394,10 +1646,14 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1464,10 +1720,14 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1530,10 +1790,14 @@ func TestFundingManagerPrivateChannel(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1606,10 +1870,14 @@ func TestFundingManagerPrivateRestart(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint)