diff --git a/fundingmanager.go b/fundingmanager.go index a1fe9dbd..3f265e8c 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -70,10 +70,29 @@ type reservationWithCtx struct { chanAmt btcutil.Amount + lastUpdated time.Time + updates chan *lnrpc.OpenStatusUpdate err chan error } +// isLocked checks the reservation's timestamp to determine whether it is locked. +func (r *reservationWithCtx) isLocked() bool { + // The time zero value represents a locked reservation. + return r.lastUpdated.IsZero() +} + +// lock locks the reservation from zombie pruning by setting its timestamp to the +// zero value. +func (r *reservationWithCtx) lock() { + r.lastUpdated = time.Time{} +} + +// updateTimestamp updates the reservation's timestamp with the current time. +func (r *reservationWithCtx) updateTimestamp() { + r.lastUpdated = time.Now() +} + // initFundingMsg is sent by an outside subsystem to the funding manager in // order to kick off a funding workflow with a specified target peer. The // original request which defines the parameters of the funding workflow are @@ -266,6 +285,14 @@ type fundingConfig struct { // discovered short channel ID of a formerly pending channel to outside // sub-systems. ReportShortChanID func(wire.OutPoint, lnwire.ShortChannelID) error + + // ZombieSweeperInterval is the periodic time interval in which the zombie + // sweeper is run. + ZombieSweeperInterval time.Duration + + // ReservationTimeout is the length of idle time that must pass before a + // reservation is considered a zombie. + ReservationTimeout time.Duration } // fundingManager acts as an orchestrator/bridge between the wallet's @@ -307,7 +334,7 @@ type fundingManager struct { signedReservations map[lnwire.ChannelID][32]byte // resMtx guards both of the maps above to ensure that all access is - // goroutine stafe. + // goroutine safe. resMtx sync.RWMutex // fundingMsgs is a channel which receives wrapped wire messages @@ -764,6 +791,9 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, func (f *fundingManager) reservationCoordinator() { defer f.wg.Done() + zombieSweepTicker := time.NewTicker(f.cfg.ZombieSweeperInterval) + defer zombieSweepTicker.Stop() + for { select { @@ -786,6 +816,9 @@ func (f *fundingManager) reservationCoordinator() { case req := <-f.fundingRequests: f.handleInitFundingMsg(req) + case <-zombieSweepTicker.C: + f.pruneZombieReservations() + case req := <-f.queries: switch msg := req.(type) { case *pendingChansReq: @@ -944,20 +977,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { amt, msg.PushAmount) // Once the reservation has been created successfully, we add it to - // this peers map of pending reservations to track this particular + // this peer's map of pending reservations to track this particular // reservation until either abort or completion. f.resMtx.Lock() if _, ok := f.activeReservations[peerIDKey]; !ok { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][msg.PendingChannelID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ reservation: reservation, chanAmt: amt, err: make(chan error, 1), peerAddress: fmsg.peerAddress, } + f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the fundingOpenMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt) @@ -1066,6 +1103,9 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { return } + // Update the timestamp once the fundingAcceptMsg has been handled. + defer resCtx.updateTimestamp() + fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID[:]) // We'll also specify the responder's preference for the number of @@ -1307,6 +1347,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { f.localDiscoverySignals[channelID] = make(chan struct{}) f.localDiscoveryMtx.Unlock() + // At this point we have sent our last funding message to the + // initiating peer before the funding transaction will be broadcast. + // The only thing left to do before we can delete this reservation + // is wait for the funding transaction. Lock the reservation so it + // is not pruned by the zombie sweeper. + resCtx.lock() + // With this last message, our job as the responder is now complete. // We'll wait for the funding transaction to reach the specified number // of confirmations, then start normal operations. @@ -1457,6 +1504,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { }, } + // At this point we have broadcast the funding transaction and done all + // necessary processing. The only thing left to do before we can delete + // this reservation is wait for the funding transaction. Lock the + // reservation so it is not pruned by the zombie sweeper. + resCtx.lock() + f.wg.Add(1) go func() { defer f.wg.Done() @@ -2457,15 +2510,19 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][chanID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ chanAmt: capacity, reservation: reservation, peerAddress: msg.peerAddress, updates: msg.updates, err: msg.err, } + f.activeReservations[peerIDKey][chanID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the initFundingMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(capacity) @@ -2547,7 +2604,7 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { } } -// processErrorGeneric sends a message to the fundingManager allowing it to +// processFundingError sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processFundingError(err *lnwire.Error, peerAddress *lnwire.NetAddress) { @@ -2559,7 +2616,7 @@ func (f *fundingManager) processFundingError(err *lnwire.Error, } } -// handleErrorGenericMsg process the error which was received from remote peer, +// handleErrorMsg processes the error which was received from remote peer, // depending on the type of error we should do different clean up steps and // inform the user about it. func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { @@ -2592,7 +2649,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { lnErr.ToGrpcCode(), string(protocolErr.Data), ) } else { - // Otherwise, we'll attempt tto display just the error code + // Otherwise, we'll attempt to display just the error code // itself. resCtx.err <- grpc.Errorf( lnErr.ToGrpcCode(), lnErr.String(), @@ -2605,7 +2662,35 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { } } -// cancelReservationCtx do all needed work in order to securely cancel the +// pruneZombieReservations loops through all pending reservations and fails the +// funding flow for any reservations that have not been updated since the +// ReservationTimeout and are not locked waiting for the funding transaction. +func (f *fundingManager) pruneZombieReservations() { + zombieReservations := make(pendingChannels) + + f.resMtx.RLock() + for _, pendingReservations := range f.activeReservations { + for pendingChanID, resCtx := range pendingReservations { + if resCtx.isLocked() { + continue + } + + if time.Since(resCtx.lastUpdated) > f.cfg.ReservationTimeout { + zombieReservations[pendingChanID] = resCtx + } + } + } + f.resMtx.RUnlock() + + for pendingChanID, resCtx := range zombieReservations { + err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+ + "chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:]) + fndgLog.Warnf(err.Error()) + f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err) + } +} + +// cancelReservationCtx does all needed work in order to securely cancel the // reservation. func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, pendingChanID [32]byte) (*reservationWithCtx, error) { diff --git a/fundingmanager_test.go b/fundingmanager_test.go index 3fbc27b4..1c9df88e 100644 --- a/fundingmanager_test.go +++ b/fundingmanager_test.go @@ -107,6 +107,7 @@ func (m *mockNotifier) RegisterConfirmationsNtfn(txid *chainhash.Hash, numConfs, Confirmed: m.oneConfChannel, }, nil } + func (m *mockNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { return &chainntnfs.BlockEpochEvent{ Epochs: m.epochChan, @@ -121,6 +122,7 @@ func (m *mockNotifier) Start() error { func (m *mockNotifier) Stop() error { return nil } + func (m *mockNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint, heightHint uint32) (*chainntnfs.SpendEvent, error) { return &chainntnfs.SpendEvent{ @@ -304,6 +306,8 @@ func createTestFundingManager(t *testing.T, privKey *btcec.PrivateKey, ReportShortChanID: func(wire.OutPoint, lnwire.ShortChannelID) error { return nil }, + ZombieSweeperInterval: 1 * time.Hour, + ReservationTimeout: 1 * time.Nanosecond, }) if err != nil { t.Fatalf("failed creating fundingManager: %v", err) @@ -381,6 +385,8 @@ func recreateAliceFundingManager(t *testing.T, alice *testNode) { publishChan <- txn return nil }, + ZombieSweeperInterval: oldCfg.ZombieSweeperInterval, + ReservationTimeout: oldCfg.ReservationTimeout, }) if err != nil { t.Fatalf("failed recreating aliceFundingManager: %v", err) @@ -485,68 +491,26 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, // Let Bob handle the init message. bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) - // Bob should answer with an AcceptChannel. - var bobMsg lnwire.Message - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send AcceptChannel message") - } - - acceptChannelResponse, ok := bobMsg.(*lnwire.AcceptChannel) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected AcceptChannel to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected AcceptChannel to be sent from bob, "+ - "instead got %T", bobMsg) - } + // Bob should answer with an AcceptChannel message. + acceptChannelResponse := assertFundingMsgSent( + t, bob.msgChan, "AcceptChannel", + ).(*lnwire.AcceptChannel) // Forward the response to Alice. alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) - // Alice responds with a FundingCreated messages. - select { - case aliceMsg = <-alice.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("alice did not send FundingCreated message") - } - fundingCreated, ok := aliceMsg.(*lnwire.FundingCreated) - if !ok { - errorMsg, gotError := aliceMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingCreated to be sent "+ - "from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingCreated to be sent from "+ - "alice, instead got %T", aliceMsg) - } + // Alice responds with a FundingCreated message. + fundingCreated := assertFundingMsgSent( + t, alice.msgChan, "FundingCreated", + ).(*lnwire.FundingCreated) // Give the message to Bob. bob.fundingMgr.processFundingCreated(fundingCreated, aliceAddr) // Finally, Bob should send the FundingSigned message. - select { - case bobMsg = <-bob.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("bob did not send FundingSigned message") - } - - fundingSigned, ok := bobMsg.(*lnwire.FundingSigned) - if !ok { - errorMsg, gotError := bobMsg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingSigned to be "+ - "sent from bob, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingSigned to be sent from "+ - "bob, instead got %T", bobMsg) - } + fundingSigned := assertFundingMsgSent( + t, bob.msgChan, "FundingSigned", + ).(*lnwire.FundingSigned) // Forward the signature to Alice. alice.fundingMgr.processFundingSigned(fundingSigned, bobAddr) @@ -581,6 +545,81 @@ func openChannel(t *testing.T, alice, bob *testNode, localFundingAmt, return fundingOutPoint } +func assertErrorNotSent(t *testing.T, msgChan chan lnwire.Message) { + select { + case <-msgChan: + t.Fatalf("error sent unexpectedly") + case <- time.After(100 * time.Millisecond): + // Expected, return. + } +} + +func assertErrorSent(t *testing.T, msgChan chan lnwire.Message) { + var msg lnwire.Message + select { + case msg = <-msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("node did not send Error message") + } + _, ok := msg.(*lnwire.Error) + if !ok { + t.Fatalf("expected Error to be sent from "+ + "node, instead got %T", msg) + } +} + +func assertFundingMsgSent(t *testing.T, msgChan chan lnwire.Message, + msgType string) lnwire.Message { + var msg lnwire.Message + select { + case msg = <-msgChan: + case <-time.After(time.Second * 5): + t.Fatalf("peer did not send %s message", msgType) + } + + var ( + sentMsg lnwire.Message + ok bool + ) + switch msgType { + case "AcceptChannel": + sentMsg, ok = msg.(*lnwire.AcceptChannel) + case "FundingCreated": + sentMsg, ok = msg.(*lnwire.FundingCreated) + case "FundingSigned": + sentMsg, ok = msg.(*lnwire.FundingSigned) + case "FundingLocked": + sentMsg, ok = msg.(*lnwire.FundingLocked) + default: + t.Fatalf("unknown message type: %s", msgType) + } + + if !ok { + errorMsg, gotError := msg.(*lnwire.Error) + if gotError { + t.Fatalf("expected %s to be sent, instead got error: %v", + msgType, lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected %s to be sent, instead got %T", + msgType, msg) + } + + return sentMsg +} + +func assertNumPendingReservations(t *testing.T, node *testNode, + peerPubKey *btcec.PublicKey, expectedNum int) { + serializedPubKey := newSerializedKey(peerPubKey) + actualNum := len(node.fundingMgr.activeReservations[serializedPubKey]) + if actualNum == expectedNum { + // Success, return. + return + } + + t.Fatalf("Expected node to have %d pending reservations, had %v", + expectedNum, actualNum) +} + func assertNumPendingChannelsBecomes(t *testing.T, node *testNode, expectedNum int) { var numPendingChans int for i := 0; i < testPollNumTries; i++ { @@ -665,28 +704,6 @@ func assertMarkedOpen(t *testing.T, alice, bob *testNode, assertDatabaseState(t, bob, fundingOutPoint, markedOpen) } -func checkNodeSendingFundingLocked(t *testing.T, node *testNode) *lnwire.FundingLocked { - var msg lnwire.Message - select { - case msg = <-node.msgChan: - case <-time.After(time.Second * 5): - t.Fatalf("node did not send fundingLocked") - } - - fundingLocked, ok := msg.(*lnwire.FundingLocked) - if !ok { - errorMsg, gotError := msg.(*lnwire.Error) - if gotError { - t.Fatalf("expected FundingLocked to be sent "+ - "from node, instead got error: %v", - lnwire.ErrorCode(errorMsg.Data[0])) - } - t.Fatalf("expected FundingLocked to be sent from node, "+ - "instead got %T", msg) - } - return fundingLocked -} - func assertFundingLockedSent(t *testing.T, alice, bob *testNode, fundingOutPoint *wire.OutPoint) { assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) @@ -874,7 +891,20 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { fundingOutPoint := openChannel(t, alice, bob, 500000, 0, 1, updateChan, true) - // Notify that transaction was mined + // Make sure both reservations time out and then run both zombie sweepers. + time.Sleep(1 * time.Millisecond) + go alice.fundingMgr.pruneZombieReservations() + go bob.fundingMgr.pruneZombieReservations() + + // Check that neither Alice nor Bob sent an error message. + assertErrorNotSent(t, alice.msgChan) + assertErrorNotSent(t, bob.msgChan) + + // Check that neither reservation has been pruned. + assertNumPendingReservations(t, alice, bobPubKey, 1) + assertNumPendingReservations(t, bob, alicePubKey, 1) + + // Notify that transaction was mined. alice.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{} bob.mockNotifier.oneConfChannel <- &chainntnfs.TxConfirmation{} @@ -885,10 +915,14 @@ func TestFundingManagerNormalWorkflow(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -970,7 +1004,9 @@ func TestFundingManagerRestartBehavior(t *testing.T) { } // Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Alice should still be markedOpen assertDatabaseState(t, alice, fundingOutPoint, markedOpen) @@ -987,7 +1023,9 @@ func TestFundingManagerRestartBehavior(t *testing.T) { return fmt.Errorf("intentional error in SendAnnouncement") } - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // The state should now be fundingLockedSent assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) @@ -1088,7 +1126,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) { } // Bob will send funding locked to Alice - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Alice should still be markedOpen assertDatabaseState(t, alice, fundingOutPoint, markedOpen) @@ -1131,7 +1171,9 @@ func TestFundingManagerOfflinePeer(t *testing.T) { close(con) // This should make Alice send the fundingLocked. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // The state should now be fundingLockedSent assertDatabaseState(t, alice, fundingOutPoint, fundingLockedSent) @@ -1168,6 +1210,212 @@ func TestFundingManagerOfflinePeer(t *testing.T) { assertNoChannelState(t, alice, bob, fundingOutPoint) } +// TestFundingManagerPeerTimeoutAfterInitFunding checks that the zombie sweeper +// will properly clean up a zombie reservation that times out after the +// initFundingMsg has been handled. +func TestFundingManagerPeerTimeoutAfterInitFunding(t *testing.T) { + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Create a funding request and start the workflow. + errChan := make(chan error, 1) + initReq := &openChanReq{ + targetPubkey: bob.privKey.PubKey(), + chainHash: *activeNetParams.GenesisHash, + localFundingAmt: 500000, + pushAmt: lnwire.NewMSatFromSatoshis(0), + private: false, + updates: updateChan, + err: errChan, + } + + alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + + // Alice should have sent the OpenChannel message to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case err := <-initReq.err: + t.Fatalf("error init funding workflow: %v", err) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenChannel message") + } + + _, ok := aliceMsg.(*lnwire.OpenChannel) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected OpenChannel to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected OpenChannel to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Alice should have a new pending reservation. + assertNumPendingReservations(t, alice, bobPubKey, 1) + + // Make sure Alice's reservation times out and then run her zombie sweeper. + time.Sleep(1 * time.Millisecond) + go alice.fundingMgr.pruneZombieReservations() + + // Alice should have sent an Error message to Bob. + assertErrorSent(t, alice.msgChan) + + // Alice's zombie reservation should have been pruned. + assertNumPendingReservations(t, alice, bobPubKey, 0) +} + +// TestFundingManagerPeerTimeoutAfterFundingOpen checks that the zombie sweeper +// will properly clean up a zombie reservation that times out after the +// fundingOpenMsg has been handled. +func TestFundingManagerPeerTimeoutAfterFundingOpen(t *testing.T) { + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Create a funding request and start the workflow. + errChan := make(chan error, 1) + initReq := &openChanReq{ + targetPubkey: bob.privKey.PubKey(), + chainHash: *activeNetParams.GenesisHash, + localFundingAmt: 500000, + pushAmt: lnwire.NewMSatFromSatoshis(0), + private: false, + updates: updateChan, + err: errChan, + } + + alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + + // Alice should have sent the OpenChannel message to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case err := <-initReq.err: + t.Fatalf("error init funding workflow: %v", err) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenChannel message") + } + + openChannelReq, ok := aliceMsg.(*lnwire.OpenChannel) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected OpenChannel to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected OpenChannel to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Alice should have a new pending reservation. + assertNumPendingReservations(t, alice, bobPubKey, 1) + + // Let Bob handle the init message. + bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + + // Bob should answer with an AcceptChannel. + assertFundingMsgSent(t, bob.msgChan, "AcceptChannel") + + // Bob should have a new pending reservation. + assertNumPendingReservations(t, bob, alicePubKey, 1) + + // Make sure Bob's reservation times out and then run his zombie sweeper. + time.Sleep(1 * time.Millisecond) + go bob.fundingMgr.pruneZombieReservations() + + // Bob should have sent an Error message to Alice. + assertErrorSent(t, bob.msgChan) + + // Bob's zombie reservation should have been pruned. + assertNumPendingReservations(t, bob, alicePubKey, 0) +} + +// TestFundingManagerPeerTimeoutAfterFundingAccept checks that the zombie sweeper +// will properly clean up a zombie reservation that times out after the +// fundingAcceptMsg has been handled. +func TestFundingManagerPeerTimeoutAfterFundingAccept(t *testing.T) { + alice, bob := setupFundingManagers(t) + defer tearDownFundingManagers(t, alice, bob) + + // We will consume the channel updates as we go, so no buffering is needed. + updateChan := make(chan *lnrpc.OpenStatusUpdate) + + // Create a funding request and start the workflow. + errChan := make(chan error, 1) + initReq := &openChanReq{ + targetPubkey: bob.privKey.PubKey(), + chainHash: *activeNetParams.GenesisHash, + localFundingAmt: 500000, + pushAmt: lnwire.NewMSatFromSatoshis(0), + private: false, + updates: updateChan, + err: errChan, + } + + alice.fundingMgr.initFundingWorkflow(bobAddr, initReq) + + // Alice should have sent the OpenChannel message to Bob. + var aliceMsg lnwire.Message + select { + case aliceMsg = <-alice.msgChan: + case err := <-initReq.err: + t.Fatalf("error init funding workflow: %v", err) + case <-time.After(time.Second * 5): + t.Fatalf("alice did not send OpenChannel message") + } + + openChannelReq, ok := aliceMsg.(*lnwire.OpenChannel) + if !ok { + errorMsg, gotError := aliceMsg.(*lnwire.Error) + if gotError { + t.Fatalf("expected OpenChannel to be sent "+ + "from bob, instead got error: %v", + lnwire.ErrorCode(errorMsg.Data[0])) + } + t.Fatalf("expected OpenChannel to be sent from "+ + "alice, instead got %T", aliceMsg) + } + + // Alice should have a new pending reservation. + assertNumPendingReservations(t, alice, bobPubKey, 1) + + // Let Bob handle the init message. + bob.fundingMgr.processFundingOpen(openChannelReq, aliceAddr) + + // Bob should answer with an AcceptChannel. + acceptChannelResponse := assertFundingMsgSent( + t, bob.msgChan, "AcceptChannel", + ).(*lnwire.AcceptChannel) + + // Bob should have a new pending reservation. + assertNumPendingReservations(t, bob, alicePubKey, 1) + + // Forward the response to Alice. + alice.fundingMgr.processFundingAccept(acceptChannelResponse, bobAddr) + + // Alice responds with a FundingCreated messages. + assertFundingMsgSent(t, alice.msgChan, "FundingCreated") + + // Make sure Alice's reservation times out and then run her zombie sweeper. + time.Sleep(1 * time.Millisecond) + go alice.fundingMgr.pruneZombieReservations() + + // Alice should have sent an Error message to Bob. + assertErrorSent(t, alice.msgChan) + + // Alice's zombie reservation should have been pruned. + assertNumPendingReservations(t, alice, bobPubKey, 0) +} + func TestFundingManagerFundingTimeout(t *testing.T) { alice, bob := setupFundingManagers(t) defer tearDownFundingManagers(t, alice, bob) @@ -1208,7 +1456,7 @@ func TestFundingManagerFundingTimeout(t *testing.T) { } // TestFundingManagerFundingNotTimeoutInitiator checks that if the user was -// the channel initiator, that it does not timeout when the lnd restarts +// the channel initiator, that it does not timeout when the lnd restarts. func TestFundingManagerFundingNotTimeoutInitiator(t *testing.T) { alice, bob := setupFundingManagers(t) @@ -1297,10 +1545,14 @@ func TestFundingManagerReceiveFundingLockedTwice(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1394,10 +1646,14 @@ func TestFundingManagerRestartAfterChanAnn(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1464,10 +1720,14 @@ func TestFundingManagerRestartAfterReceivingFundingLocked(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1530,10 +1790,14 @@ func TestFundingManagerPrivateChannel(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) @@ -1606,10 +1870,14 @@ func TestFundingManagerPrivateRestart(t *testing.T) { // After the funding transaction is mined, Alice will send // fundingLocked to Bob. - fundingLockedAlice := checkNodeSendingFundingLocked(t, alice) + fundingLockedAlice := assertFundingMsgSent( + t, alice.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // And similarly Bob will send funding locked to Alice. - fundingLockedBob := checkNodeSendingFundingLocked(t, bob) + fundingLockedBob := assertFundingMsgSent( + t, bob.msgChan, "FundingLocked", + ).(*lnwire.FundingLocked) // Check that the state machine is updated accordingly assertFundingLockedSent(t, alice, bob, fundingOutPoint) diff --git a/lnd.go b/lnd.go index b2685f90..9b2f5088 100644 --- a/lnd.go +++ b/lnd.go @@ -414,6 +414,8 @@ func lndMain() error { // channel bandwidth. return uint16(lnwallet.MaxHTLCNumber / 2) }, + ZombieSweeperInterval: 1 * time.Minute, + ReservationTimeout: 10 * time.Minute, }) if err != nil { return err diff --git a/lnwallet/reservation.go b/lnwallet/reservation.go index 21a365bd..2865c74b 100644 --- a/lnwallet/reservation.go +++ b/lnwallet/reservation.go @@ -58,8 +58,8 @@ type InputScript struct { } // ChannelReservation represents an intent to open a lightning payment channel -// a counterparty. The funding processes from reservation to channel opening is -// a 3-step process. In order to allow for full concurrency during the +// with a counterparty. The funding processes from reservation to channel opening +// is a 3-step process. In order to allow for full concurrency during the // reservation workflow, resources consumed by a contribution are "locked" // themselves. This prevents a number of race conditions such as two funding // transactions double-spending the same input. A reservation can also be @@ -69,12 +69,12 @@ type InputScript struct { // The reservation workflow consists of the following three steps: // 1. lnwallet.InitChannelReservation // * One requests the wallet to allocate the necessary resources for a -// channel reservation. These resources a put in limbo for the lifetime -// of a reservation. -// * Once completed the reservation will have the wallet's contribution -// accessible via the .OurContribution() method. This contribution -// contains the necessary items to allow the remote party to build both -// the funding, and commitment transactions. +// channel reservation. These resources are put in limbo for the lifetime +// of a reservation. +// * Once completed the reservation will have the wallet's contribution +// accessible via the .OurContribution() method. This contribution +// contains the necessary items to allow the remote party to build both +// the funding, and commitment transactions. // 2. ChannelReservation.ProcessContribution/ChannelReservation.ProcessSingleContribution // * The counterparty presents their contribution to the payment channel. // This allows us to build the funding, and commitment transactions diff --git a/lnwallet/wallet.go b/lnwallet/wallet.go index 61f09c2f..f64efc95 100644 --- a/lnwallet/wallet.go +++ b/lnwallet/wallet.go @@ -52,9 +52,8 @@ func (e *ErrInsufficientFunds) Error() string { // will be created in order to track the lifetime of this pending channel. // Outputs selected will be 'locked', making them unavailable, for any other // pending reservations. Therefore, all channels in reservation limbo will be -// periodically after a timeout period in order to avoid "exhaustion" attacks. -// -// TODO(roasbeef): zombie reservation sweeper goroutine. +// periodically timed out after an idle period in order to avoid "exhaustion" +// attacks. type initFundingReserveMsg struct { // chainHash denotes that chain to be used to ultimately open the // target channel. @@ -261,8 +260,6 @@ type LightningWallet struct { fundingLimbo map[uint64]*ChannelReservation nextFundingID uint64 limboMtx sync.RWMutex - // TODO(roasbeef): zombie garbage collection routine to solve - // lost-object/starvation problem/attack. // lockedOutPoints is a set of the currently locked outpoint. This // information is kept in order to provide an easy way to unlock all @@ -366,7 +363,7 @@ func (l *LightningWallet) ActiveReservations() []*ChannelReservation { } // requestHandler is the primary goroutine(s) responsible for handling, and -// dispatching relies to all messages. +// dispatching replies to all messages. func (l *LightningWallet) requestHandler() { out: for { @@ -403,14 +400,14 @@ out: // successful, a ChannelReservation containing our completed contribution is // returned. Our contribution contains all the items necessary to allow the // counterparty to build the funding transaction, and both versions of the -// commitment transaction. Otherwise, an error occurred a nil pointer along with -// an error are returned. +// commitment transaction. Otherwise, an error occurred and a nil pointer along +// with an error are returned. // // Once a ChannelReservation has been obtained, two additional steps must be // processed before a payment channel can be considered 'open'. The second step // validates, and processes the counterparty's channel contribution. The third, // and final step verifies all signatures for the inputs of the funding -// transaction, and that the signature we records for our version of the +// transaction, and that the signature we record for our version of the // commitment transaction is valid. func (l *LightningWallet) InitChannelReservation( capacity, ourFundAmt btcutil.Amount, pushMSat lnwire.MilliSatoshi, @@ -579,7 +576,7 @@ func (l *LightningWallet) handleFundingReserveRequest(req *initFundingReserveMsg reservation.partialState.RevocationProducer = producer reservation.ourContribution.ChannelConstraints = l.Cfg.DefaultConstraints - // TODO(roasbeef): turn above into: initContributio() + // TODO(roasbeef): turn above into: initContribution() // Create a limbo and record entry for this newly pending funding // request.