From 75e45b830beab637fe7d844bb9c52f66e919dcf5 Mon Sep 17 00:00:00 2001 From: PaddyQuinn Date: Mon, 12 Mar 2018 21:58:51 -0400 Subject: [PATCH] funding: implement reservation zombie sweeper Before previous commits were squashed into this commit, zombie reservations were cleaned up individually when they timed out. However, this made the code more complex because each reservation had its own individual timer and thus it would have required the timer being cancelled any time the reservation was cancelled, which would have been harder to maintain. With this commit, zombie reservations are cleaned up by a zombie sweeper that is set off by a ticker instead, to make the code more maintainable. --- fundingmanager.go | 101 ++++++++++++++++++++++++++++++++++++++++++---- lnd.go | 2 + 2 files changed, 95 insertions(+), 8 deletions(-) diff --git a/fundingmanager.go b/fundingmanager.go index 9e29de16..41177d58 100644 --- a/fundingmanager.go +++ b/fundingmanager.go @@ -70,10 +70,29 @@ type reservationWithCtx struct { chanAmt btcutil.Amount + lastUpdated time.Time + updates chan *lnrpc.OpenStatusUpdate err chan error } +// isLocked checks the reservation's timestamp to determine whether it is locked. +func (r *reservationWithCtx) isLocked() bool { + // The time zero value represents a locked reservation. + return r.lastUpdated.IsZero() +} + +// lock locks the reservation from zombie pruning by setting its timestamp to the +// zero value. +func (r *reservationWithCtx) lock() { + r.lastUpdated = time.Time{} +} + +// updateTimestamp updates the reservation's timestamp with the current time. +func (r *reservationWithCtx) updateTimestamp() { + r.lastUpdated = time.Now() +} + // initFundingMsg is sent by an outside subsystem to the funding manager in // order to kick off a funding workflow with a specified target peer. The // original request which defines the parameters of the funding workflow are @@ -266,6 +285,14 @@ type fundingConfig struct { // discovered short channel ID of a formerly pending channel to outside // sub-systems. ReportShortChanID func(wire.OutPoint, lnwire.ShortChannelID) error + + // ZombieSweeperInterval is the periodic time interval in which the zombie + // sweeper is run. + ZombieSweeperInterval time.Duration + + // ReservationTimeout is the length of idle time that must pass before a + // reservation is considered a zombie. + ReservationTimeout time.Duration } // fundingManager acts as an orchestrator/bridge between the wallet's @@ -307,7 +334,7 @@ type fundingManager struct { signedReservations map[lnwire.ChannelID][32]byte // resMtx guards both of the maps above to ensure that all access is - // goroutine stafe. + // goroutine safe. resMtx sync.RWMutex // fundingMsgs is a channel which receives wrapped wire messages @@ -764,6 +791,9 @@ func (f *fundingManager) failFundingFlow(peer *btcec.PublicKey, func (f *fundingManager) reservationCoordinator() { defer f.wg.Done() + zombieSweepTicker := time.NewTicker(f.cfg.ZombieSweeperInterval) + defer zombieSweepTicker.Stop() + for { select { @@ -786,6 +816,9 @@ func (f *fundingManager) reservationCoordinator() { case req := <-f.fundingRequests: f.handleInitFundingMsg(req) + case <-zombieSweepTicker.C: + f.pruneZombieReservations() + case req := <-f.queries: switch msg := req.(type) { case *pendingChansReq: @@ -941,20 +974,24 @@ func (f *fundingManager) handleFundingOpen(fmsg *fundingOpenMsg) { amt, msg.PushAmount) // Once the reservation has been created successfully, we add it to - // this peers map of pending reservations to track this particular + // this peer's map of pending reservations to track this particular // reservation until either abort or completion. f.resMtx.Lock() if _, ok := f.activeReservations[peerIDKey]; !ok { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][msg.PendingChannelID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ reservation: reservation, chanAmt: amt, err: make(chan error, 1), peerAddress: fmsg.peerAddress, } + f.activeReservations[peerIDKey][msg.PendingChannelID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the fundingOpenMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(amt) @@ -1063,6 +1100,9 @@ func (f *fundingManager) handleFundingAccept(fmsg *fundingAcceptMsg) { return } + // Update the timestamp once the fundingAcceptMsg has been handled. + defer resCtx.updateTimestamp() + fndgLog.Infof("Recv'd fundingResponse for pendingID(%x)", pendingChanID[:]) // We'll also specify the responder's preference for the number of @@ -1304,6 +1344,13 @@ func (f *fundingManager) handleFundingCreated(fmsg *fundingCreatedMsg) { f.localDiscoverySignals[channelID] = make(chan struct{}) f.localDiscoveryMtx.Unlock() + // At this point we have sent our last funding message to the + // initiating peer before the funding transaction will be broadcast. + // The only thing left to do before we can delete this reservation + // is wait for the funding transaction. Lock the reservation so it + // is not pruned by the zombie sweeper. + resCtx.lock() + // With this last message, our job as the responder is now complete. // We'll wait for the funding transaction to reach the specified number // of confirmations, then start normal operations. @@ -1454,6 +1501,12 @@ func (f *fundingManager) handleFundingSigned(fmsg *fundingSignedMsg) { }, } + // At this point we have broadcast the funding transaction and done all + // necessary processing. The only thing left to do before we can delete + // this reservation is wait for the funding transaction. Lock the + // reservation so it is not pruned by the zombie sweeper. + resCtx.lock() + f.wg.Add(1) go func() { defer f.wg.Done() @@ -2454,15 +2507,19 @@ func (f *fundingManager) handleInitFundingMsg(msg *initFundingMsg) { f.activeReservations[peerIDKey] = make(pendingChannels) } - f.activeReservations[peerIDKey][chanID] = &reservationWithCtx{ + resCtx := &reservationWithCtx{ chanAmt: capacity, reservation: reservation, peerAddress: msg.peerAddress, updates: msg.updates, err: msg.err, } + f.activeReservations[peerIDKey][chanID] = resCtx f.resMtx.Unlock() + // Update the timestamp once the initFundingMsg has been handled. + defer resCtx.updateTimestamp() + // Using the RequiredRemoteDelay closure, we'll compute the remote CSV // delay we require given the total amount of funds within the channel. remoteCsvDelay := f.cfg.RequiredRemoteDelay(capacity) @@ -2544,7 +2601,7 @@ func (f *fundingManager) waitUntilChannelOpen(targetChan lnwire.ChannelID) { } } -// processErrorGeneric sends a message to the fundingManager allowing it to +// processFundingError sends a message to the fundingManager allowing it to // process the occurred generic error. func (f *fundingManager) processFundingError(err *lnwire.Error, peerAddress *lnwire.NetAddress) { @@ -2556,7 +2613,7 @@ func (f *fundingManager) processFundingError(err *lnwire.Error, } } -// handleErrorGenericMsg process the error which was received from remote peer, +// handleErrorMsg processes the error which was received from remote peer, // depending on the type of error we should do different clean up steps and // inform the user about it. func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { @@ -2589,7 +2646,7 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { lnErr.ToGrpcCode(), string(protocolErr.Data), ) } else { - // Otherwise, we'll attempt tto display just the error code + // Otherwise, we'll attempt to display just the error code // itself. resCtx.err <- grpc.Errorf( lnErr.ToGrpcCode(), lnErr.String(), @@ -2602,7 +2659,35 @@ func (f *fundingManager) handleErrorMsg(fmsg *fundingErrorMsg) { } } -// cancelReservationCtx do all needed work in order to securely cancel the +// pruneZombieReservations loops through all pending reservations and fails the +// funding flow for any reservations that have not been updated since the +// ReservationTimeout and are not locked waiting for the funding transaction. +func (f *fundingManager) pruneZombieReservations() { + zombieReservations := make(pendingChannels) + + f.resMtx.RLock() + for _, pendingReservations := range f.activeReservations { + for pendingChanID, resCtx := range pendingReservations { + if resCtx.isLocked() { + continue + } + + if time.Since(resCtx.lastUpdated) > f.cfg.ReservationTimeout { + zombieReservations[pendingChanID] = resCtx + } + } + } + f.resMtx.RUnlock() + + for pendingChanID, resCtx := range zombieReservations { + err := fmt.Errorf("reservation timed out waiting for peer (peerID:%v, "+ + "chanID:%x)", resCtx.peerAddress.IdentityKey, pendingChanID[:]) + fndgLog.Warnf(err.Error()) + f.failFundingFlow(resCtx.peerAddress.IdentityKey, pendingChanID, err) + } +} + +// cancelReservationCtx does all needed work in order to securely cancel the // reservation. func (f *fundingManager) cancelReservationCtx(peerKey *btcec.PublicKey, pendingChanID [32]byte) (*reservationWithCtx, error) { diff --git a/lnd.go b/lnd.go index b2685f90..9b2f5088 100644 --- a/lnd.go +++ b/lnd.go @@ -414,6 +414,8 @@ func lndMain() error { // channel bandwidth. return uint16(lnwallet.MaxHTLCNumber / 2) }, + ZombieSweeperInterval: 1 * time.Minute, + ReservationTimeout: 10 * time.Minute, }) if err != nil { return err