From a0cc1d1b2dbd671765df1026320b4913fd3b39e7 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 18 Jan 2018 14:06:38 -0800 Subject: [PATCH] breacharbiter: utilize new channel on-chain event stream to watch for breaches MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we modify the breach arbiter to no longer require holding a channel object directly in order to receive new notifications about possible breaches. Instead, we’ll contact the chain arbiter to request a new channel event subscription. As a result of the new architecture, we no longer need to receive a handoff once the new channel comes online, as the chainWatcher will always be active and watching the channel until it’s been closed. --- breacharbiter.go | 134 ++++++++++++++--------------------------------- server.go | 19 ++----- 2 files changed, 43 insertions(+), 110 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 3f9b67cb..96cb93d8 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -12,6 +12,7 @@ import ( "github.com/davecgh/go-spew/spew" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/contractcourt" "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/blockchain" @@ -45,14 +46,6 @@ type BreachConfig struct { // a close type to be included in the channel close summary. CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType) - // UpdateCloseSignal allows the breach arbiter to notify the - // ChainArbitrator that a set of new signals for the unilateral closing - // of a channel is now available. This ensures that ifa channel hasn't - // had any updates since it was live, then we're still able to act on - // on-chain events. - UpdateCloseSignal func(*wire.OutPoint, - chan *lnwallet.UnilateralCloseSummary) error - // DB provides access to the user's channels, allowing the breach // arbiter to determine the current state of a user's channels, and how // it should respond to channel closure. @@ -74,6 +67,11 @@ type BreachConfig struct { // transaction to the network. PublishTransaction func(*wire.MsgTx) error + // SubscribeChannelEvents is a function closure that allows goroutines + // within the breachArbiter to be notified of potential on-chain events + // related to the channels they're watching. + SubscribeChannelEvents func(wire.OutPoint) (*contractcourt.ChainEventSubscription, error) + // Signer is used by the breach arbiter to generate sweep transactions, // which move coins from previously open channels back to the user's // wallet. @@ -113,11 +111,6 @@ type breachArbiter struct { // use this to communicate with the main contractObserver goroutine. breachedContracts chan *retributionInfo - // newContracts is a channel which is used by outside subsystems to - // notify the breachArbiter of a new contract (a channel) that should - // be watched. - newContracts chan *lnwallet.LightningChannel - // settledContracts is a channel by outside subsystems to notify // the breachArbiter that a channel has peacefully been closed. Once a // channel has been closed the arbiter no longer needs to watch for @@ -132,11 +125,9 @@ type breachArbiter struct { // its dependent objects. func newBreachArbiter(cfg *BreachConfig) *breachArbiter { return &breachArbiter{ - cfg: cfg, - + cfg: cfg, breachObservers: make(map[wire.OutPoint]chan struct{}), breachedContracts: make(chan *retributionInfo), - newContracts: make(chan *lnwallet.LightningChannel), settledContracts: make(chan *wire.OutPoint), quit: make(chan struct{}), } @@ -215,27 +206,26 @@ func (b *breachArbiter) Start() error { // retribution store, we skip it to avoid creating a breach observer. // Resolving breached channels will be handled later by spawning an // exactRetribution task for each. - channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive) + channelsToWatch := make([]*contractcourt.ChainEventSubscription, 0, nActive) for _, chanState := range activeChannels { // If this channel was previously breached, we skip it here to // avoid creating a breach observer, as we can go straight to // the task of exacting retribution. - if _, ok := breachRetInfos[chanState.FundingOutpoint]; ok { + chanPoint := chanState.FundingOutpoint + if _, ok := breachRetInfos[chanPoint]; ok { continue } - // Initialize active channel from persisted channel state. - channel, err := lnwallet.NewLightningChannel(nil, - b.cfg.Notifier, nil, chanState) + // For each active channels, we'll request a chain event + // subscription form the system that's overseeing the channel. + chainEvents, err := b.cfg.SubscribeChannelEvents(chanPoint) if err != nil { - brarLog.Errorf("unable to load channel from "+ - "disk: %v", err) return err } - // Finally, add this channel to breach arbiter's list of - // channels to watch. - channelsToWatch = append(channelsToWatch, channel) + // Finally, add this channel event stream to breach arbiter's + // list of channels to watch. + channelsToWatch = append(channelsToWatch, chainEvents) } // Spawn the exactRetribution tasks to monitor and resolve any breaches @@ -297,34 +287,24 @@ func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) { // channel into the daemon's wallet. // // NOTE: This MUST be run as a goroutine. -func (b *breachArbiter) contractObserver( - activeChannels []*lnwallet.LightningChannel) { +func (b *breachArbiter) contractObserver(channelEvents []*contractcourt.ChainEventSubscription) { defer b.wg.Done() brarLog.Infof("Starting contract observer with %v active channels", - len(activeChannels)) + len(channelEvents)) // For each active channel found within the database, we launch a // detected breachObserver goroutine for that channel and also track // the new goroutine within the breachObservers map so we can cancel it // later if necessary. - for _, channel := range activeChannels { + for _, channelEvent := range channelEvents { settleSignal := make(chan struct{}) - chanPoint := channel.ChanPoint - b.breachObservers[*chanPoint] = settleSignal - - // Before we'll launch our breach observe, we'll send this - // latest set of contract signals to the ChainArbitrator. - // - // TODO(roasbeef): just move now? - err := b.cfg.UpdateCloseSignal(chanPoint, channel.UnilateralClose) - if err != nil { - brarLog.Errorf("unable to update close signals: %v", err) - } + chanPoint := channelEvent.ChanPoint + b.breachObservers[chanPoint] = settleSignal b.wg.Add(1) - go b.breachObserver(channel, settleSignal) + go b.breachObserver(channelEvent, settleSignal) } // TODO(roasbeef): need to ensure currentHeight passed in doesn't @@ -363,37 +343,6 @@ out: delete(b.breachObservers, breachInfo.chanPoint) - case contract := <-b.newContracts: - // A new channel has just been opened within the - // daemon, so we launch a new breachObserver to handle - // the detection of attempted contract breaches. - settleSignal := make(chan struct{}) - chanPoint := contract.ChanPoint - - // If the contract is already being watched, then an - // additional send indicates we have a stale version of - // the contract. So we'll cancel active watcher - // goroutine to create a new instance with the latest - // contract reference. - if oldSignal, ok := b.breachObservers[*chanPoint]; ok { - brarLog.Infof("ChannelPoint(%v) is now live, "+ - "abandoning state contract for live "+ - "version", chanPoint) - close(oldSignal) - } - - b.breachObservers[*chanPoint] = settleSignal - - brarLog.Debugf("New contract detected, launching " + - "breachObserver") - - b.wg.Add(1) - go b.breachObserver(contract, settleSignal) - - // TODO(roasbeef): add doneChan to signal to peer - // continue * peer send over to us on - // loadActiveChanenls, sync until we're aware so no - // state transitions case chanPoint := <-b.settledContracts: // A new channel has been closed either unilaterally or // cooperatively, as a result we no longer need a @@ -573,12 +522,18 @@ func (b *breachArbiter) exactRetribution( // generated due to the breach of channel contract. The funds will be swept // only after the breaching transaction receives a necessary number of // confirmations. -func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, +func (b *breachArbiter) breachObserver( + chainEvents *contractcourt.ChainEventSubscription, settleSignal chan struct{}) { - defer b.wg.Done() + defer func() { + b.wg.Done() + chainEvents.Cancel() + }() - chanPoint := contract.ChanPoint + chanPoint := chainEvents.ChanPoint + + // TODO(roasbeef): needs to get the signals from the arb!!! brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", chanPoint) @@ -587,13 +542,11 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // A read from this channel indicates that the contract has been // settled cooperatively so we exit as our duties are no longer needed. case <-settleSignal: - contract.CancelObserver() - contract.Stop() return // The channel has been closed by a normal means: force closing with // the latest commitment transaction. - case <-contract.UnilateralCloseSignal: + case <-chainEvents.UnilateralClosure: // Launch a goroutine to cancel out this contract within the // breachArbiter's main goroutine. @@ -602,18 +555,17 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, defer b.wg.Done() select { - case b.settledContracts <- chanPoint: + case b.settledContracts <- &chanPoint: case <-b.quit: } }() - b.cfg.CloseLink(chanPoint, htlcswitch.CloseBreach) - contract.Stop() + b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) // A read from this channel indicates that a channel breach has been // detected! So we notify the main coordination goroutine with the // information needed to bring the counterparty to justice. - case breachInfo := <-contract.ContractBreach: + case breachInfo := <-chainEvents.ContractBreach: brarLog.Warnf("REVOKED STATE #%v FOR ChannelPoint(%v) "+ "broadcast, REMOTE PEER IS DOING SOMETHING "+ "SKETCHY!!!", breachInfo.RevokedStateNum, @@ -623,7 +575,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // breached in order to ensure any incoming or outgoing // multi-hop HTLCs aren't sent over this link, nor any other // links associated with this peer. - b.cfg.CloseLink(chanPoint, htlcswitch.CloseBreach) + b.cfg.CloseLink(&chanPoint, htlcswitch.CloseBreach) // TODO(roasbeef): need to handle case of remote broadcast // mid-local initiated state-transition, possible @@ -632,7 +584,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // Using the breach information provided by the wallet and the // channel snapshot, construct the retribution information that // will be persisted to disk. - retInfo := newRetributionInfo(chanPoint, breachInfo) + retInfo := newRetributionInfo(&chanPoint, breachInfo) // Persist the pending retribution state to disk. err := b.cfg.Store.Add(retInfo) @@ -646,21 +598,13 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // the ack is successful, the close observer will mark the // channel as pending-closed in the channeldb. select { - case breachInfo.Err <- err: + case chainEvents.ProcessACK <- struct{}{}: // Bail if we failed to persist retribution info. if err != nil { return } - case <-contract.ObserverQuit(): - // If the close observer has already exited, it will - // never read the acknowledgment, so we exit. - return - case <-b.quit: - // Cancel the close observer if the breach arbiter is - // shutting down, dropping the acknowledgment. - contract.CancelObserver() return } @@ -672,8 +616,6 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, } case <-b.quit: - contract.Stop() - contract.CancelObserver() return } } diff --git a/server.go b/server.go index 283e1ba6..820dc315 100644 --- a/server.go +++ b/server.go @@ -409,20 +409,11 @@ func newServer(listenAddrs []string, chanDB *channeldb.DB, cc *chainControl, GenSweepScript: func() ([]byte, error) { return newSweepPkScript(cc.wallet) }, - Notifier: cc.chainNotifier, - PublishTransaction: cc.wallet.PublishTransaction, - Signer: cc.wallet.Cfg.Signer, - Store: newRetributionStore(chanDB), - UpdateCloseSignal: func(op *wire.OutPoint, - ucs chan *lnwallet.UnilateralCloseSummary) error { - - signals := &contractcourt.ContractSignals{ - HtlcUpdates: make(chan []channeldb.HTLC), - UniCloseSignal: ucs, - } - - return s.chainArb.UpdateContractSignals(*op, signals) - }, + Notifier: cc.chainNotifier, + PublishTransaction: cc.wallet.PublishTransaction, + SubscribeChannelEvents: s.chainArb.SubscribeChannelEvents, + Signer: cc.wallet.Cfg.Signer, + Store: newRetributionStore(chanDB), }) // Create the connection manager which will be responsible for