contractcourt: add new chainWatcher struct

In this commit, we add a new struct to the package, the chainWatcher.
The duty of this struct is to replace the functionality that was
previously implemented by the closeObserver of each channel. Rather
than the source of notification being tied to the lifetime of a
particular object, it’s now delegated to a persistent object that will
be around for the entire lifetime of the channel (until it’s closed).
This will serve to greatly simplify the code, and eliminate a large
class of bugs.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-18 13:54:52 -08:00
parent 5bbe126c34
commit 0e14ac2063
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
1 changed files with 462 additions and 0 deletions

View File

@ -0,0 +1,462 @@
package contractcourt
import (
"fmt"
"sync"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/lightningnetwork/lnd/channeldb"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/wire"
)
// ChainEventSubscription is a struct that houses a subscription to be notified
// for any on-chain events related to a channel. There are three types of
// possible on-chain events: a cooperative channel closure, a unilateral
// channel closure, and a channel breach. The fourth type: a force close is
// locally initiated, so we don't provide any event stream for said event.
type ChainEventSubscription struct {
// ChanPoint is that channel that chain events will be dispatched fo.
ChanPoint wire.OutPoint
// UnilateralClosure is a channel that will be sent upon in the event that
// the remote party broadcasts their latest version of the commitment
// transaction.
UnilateralClosure chan *lnwallet.UnilateralCloseSummary
// CooperativeClosure is a signal that will be sent upon once a cooperative
// channel closure has been detected.
//
// TODO(roasbeef): or something else
CooperativeClosure chan struct{}
// ContractBreach is a channel that will be sent upon if we detect a
// contract breach. The struct sent across the channel contains all the
// material required to bring the cheating channel peer to justice.
ContractBreach chan *lnwallet.BreachRetribution
// ProcessACK is a channel that'll be used by the chainWatcher to
// synchronize dispatch and processing of the notification with the act
// of updating the state of the channel on disk. This ensures that the
// event can be reliably handed off.
ProcessACK chan struct{}
// Cancel cancels the subscription to the event stream for a particular
// channel. This method should be called once the caller no longer needs to
// be notified of any on-chain events for a particular channel.
Cancel func()
}
// chainWatcher is a system that's assigned to every active channel. The duty
// of this system is to watch the chain for spends of the channels chan point.
// If a spend is detected then with chain watcher will notify all subscribers
// that the channel has been closed, and also give them the materials necessary
// to sweep the funds of the channel on chain eventually.
type chainWatcher struct {
quit chan struct{}
wg sync.WaitGroup
// chanState is a snapshot of the persistent state of the channel that
// we're watching. In the event of an on-chain event, we'll query the
// database to ensure that we act using the most up to date state.
chanState *channeldb.OpenChannel
// stateHintObfuscator is a 48-bit state hint that's used to obfsucate
// the current state number on the commitment transactions.
stateHintObfuscator [lnwallet.StateHintSize]byte
// notifier is a reference to the channel notifier that we'll use to be
// notified of output spends and when transactions are confirmed.
notifier chainntnfs.ChainNotifier
// pCache is a reference to the shared preimage cache. We'll use this
// to see if we can settle any incoming HTLC's during a remote
// commitment close event.
pCache WitnessBeacon
// signer is the main signer instances that will be responsible for
// signing any HTLC and commitment transaction generated by the state
// machine.
signer lnwallet.Signer
// All the fields below are protected by this mutex.
sync.RWMutex
// clientID is an ephemeral counter used to keep track of each
// individual client subscription.
clientID uint64
// clientSubscriptions is a map that keeps track of all the active
// client subscriptions for events related to this channel.
clientSubscriptions map[uint64]*ChainEventSubscription
}
// newChainWatcher returns a new instance of a chainWatcher for a channel given
// the chan point to watch, and also a notifier instance that will allow us to
// detect on chain events.
func newChainWatcher(chanState *channeldb.OpenChannel,
notifier chainntnfs.ChainNotifier, pCache WitnessBeacon,
signer lnwallet.Signer) (*chainWatcher, error) {
// In order to be able to detect the nature of a potential channel
// closure we'll need to reconstruct the state hint bytes used to
// obfuscate the commitment state number encoded in the lock time and
// sequence fields.
var stateHint [lnwallet.StateHintSize]byte
if chanState.IsInitiator {
stateHint = lnwallet.DeriveStateHintObfuscator(
chanState.LocalChanCfg.PaymentBasePoint,
chanState.RemoteChanCfg.PaymentBasePoint,
)
} else {
stateHint = lnwallet.DeriveStateHintObfuscator(
chanState.RemoteChanCfg.PaymentBasePoint,
chanState.LocalChanCfg.PaymentBasePoint,
)
}
return &chainWatcher{
chanState: chanState,
stateHintObfuscator: stateHint,
notifier: notifier,
pCache: pCache,
signer: signer,
quit: make(chan struct{}),
clientSubscriptions: make(map[uint64]*ChainEventSubscription),
}, nil
}
// Start starts all goroutines that the chainWatcher needs to perform its
// duties.
func (c *chainWatcher) Start() error {
log.Debugf("Starting chain watcher for ChannelPoint(%v)",
c.chanState.FundingOutpoint)
// First, we'll register for a notification to be dispatched if the
// funding output is spent.
fundingOut := &c.chanState.FundingOutpoint
// As a height hint, we'll try to use the opening height, but if the
// channel isn't yet open, then we'll use the height it was broadcast
// at.
heightHint := c.chanState.ShortChanID.BlockHeight
if heightHint == 0 {
heightHint = c.chanState.FundingBroadcastHeight
}
spendNtfn, err := c.notifier.RegisterSpendNtfn(
fundingOut, heightHint,
)
if err != nil {
return err
}
// With the spend notification obtained, we'll now dispatch the
// closeObserver which will properly react to any changes.
c.wg.Add(1)
go c.closeObserver(spendNtfn)
return nil
}
// Stop signals the close observer to gracefully exit.
func (c *chainWatcher) Stop() error {
close(c.quit)
c.wg.Wait()
return nil
}
// SubscribeChannelEvents returns a n active subscription to the set of channel
// events for the channel watched by this chain watcher. Once clients no longer
// require the subscription, they should call the Cancel() method to allow the
// watcher to regain those committed resources.
func (c *chainWatcher) SubscribeChannelEvents() *ChainEventSubscription {
c.Lock()
defer c.Unlock()
clientID := c.clientID
c.clientID++
log.Debugf("New ChainEventSubscription(id=%v) for ChannelPoint(%v)",
clientID, c.chanState.FundingOutpoint)
sub := &ChainEventSubscription{
ChanPoint: c.chanState.FundingOutpoint,
UnilateralClosure: make(chan *lnwallet.UnilateralCloseSummary, 1),
CooperativeClosure: make(chan struct{}, 1),
ContractBreach: make(chan *lnwallet.BreachRetribution, 1),
Cancel: func() {
c.Lock()
delete(c.clientSubscriptions, clientID)
c.Unlock()
return
},
}
c.clientSubscriptions[clientID] = sub
return sub
}
// closeObserver is a dedicated goroutine that will watch for any closes of the
// channel that it's watching on chain. In the event of an on-chain event, the
// close observer will assembled the proper materials required to claim the
// funds of the channel on-chain (if required), then dispatch these as
// notifications to all subscribers.
func (c *chainWatcher) closeObserver(spendNtfn *chainntnfs.SpendEvent) {
defer c.wg.Done()
log.Infof("Close observer for ChannelPoint(%v) active",
c.chanState.FundingOutpoint)
for {
select {
// We've detected a spend of the channel onchain! Depending on
// the type of spend, we'll act accordingly , so we'll examine
// the spending transaction to determine what we should do.
case commitSpend, ok := <-spendNtfn.Spend:
// If the channel was closed, then this means that the
// notifier exited, so we will as well.
if !ok {
return
}
// Otherwise, the remote party might have broadcast a
// prior revoked state...!!!
commitTxBroadcast := commitSpend.SpendingTx
localCommit, remoteCommit, err := c.chanState.LatestCommitments()
if err != nil {
log.Errorf("Unable to fetch channel state for "+
"chan_point=%v", c.chanState.FundingOutpoint)
return
}
// We'll not retrieve the latest sate of the revocation
// store so we can populate the information within the
// channel state object that we have.
//
// TODO(roasbeef): mutation is bad mkay
_, err = c.chanState.RemoteRevocationStore()
if err != nil {
log.Errorf("Unable to fetch revocation state for "+
"chan_point=%v", c.chanState.FundingOutpoint)
return
}
// If this is our commitment transaction, then we can
// exit here as we don't have any further processing we
// need to do (we can't cheat ourselves :p).
commitmentHash := localCommit.CommitTx.TxHash()
isOurCommitment := commitSpend.SpenderTxHash.IsEqual(
&commitmentHash,
)
if isOurCommitment {
return
}
// Next, we'll check to see if this is a cooperative
// channel closure or not. This is characterized by
//
// TODO(roasbeef): check to see if txid amongst those
// that we know are co-op channel closes
log.Warnf("Unprompted commitment broadcast for "+
"ChannelPoint(%v) ", c.chanState.FundingOutpoint)
// Decode the state hint encoded within the commitment
// transaction to determine if this is a revoked state
// or not.
obfuscator := c.stateHintObfuscator
broadcastStateNum := lnwallet.GetStateNumHint(
commitTxBroadcast, obfuscator,
)
remoteStateNum := remoteCommit.CommitHeight
switch {
// If state number spending transaction matches the
// current latest state, then they've initiated a
// unilateral close. So we'll trigger the unilateral
// close signal so subscribers can clean up the state
// as necessary.
//
// We'll also handle the case of the remote party
// broadcasting their commitment transaction which is
// one height above ours. This case can arise when we
// initiate a state transition, but the remote party
// has a fail crash _after_ accepting the new state,
// but _before_ sending their signature to us.
case broadcastStateNum >= remoteStateNum:
if err := c.dispatchRemoteClose(
commitSpend, *remoteCommit,
); err != nil {
log.Errorf("unable to handle remote "+
"close for chan_point=%v",
c.chanState.FundingOutpoint, err)
}
// If the state number broadcast is lower than the
// remote node's current un-revoked height, then
// THEY'RE ATTEMPTING TO VIOLATE THE CONTRACT LAID OUT
// WITHIN THE PAYMENT CHANNEL. Therefore we close the
// signal indicating a revoked broadcast to allow
// subscribers to
// swiftly dispatch justice!!!
case broadcastStateNum < remoteStateNum:
if err := c.dispatchContractBreach(
commitSpend, remoteCommit,
); err != nil {
log.Errorf("unable to handle channel "+
"breach for chan_point=%v: %v",
c.chanState.FundingOutpoint, err)
}
}
// Now that a spend has been detected, we've done our
// job, so we'll exit immediately.
return
// The chainWatcher has been signalled to exit, so we'll do so now.
case <-c.quit:
}
}
}
// dispatchRemoteClose processes a detected unilateral channel closure by the
// remote party. This function will prepare a UnilateralCloseSummary which will
// then be sent to any subscribers allowing them to resolve all our funds in
// the channel on chain. Once this close summary is prepared, all registered
// subscribers will receive a notification of this event.
func (c *chainWatcher) dispatchRemoteClose(commitSpend *chainntnfs.SpendDetail,
remoteCommit channeldb.ChannelCommitment) error {
log.Infof("Unilateral close of ChannelPoint(%v) "+
"detected", c.chanState.FundingOutpoint)
// First, we'll create a closure summary that contains all the
// materials required to let each subscriber sweep the funds in the
// channel on-chain.
uniClose, err := lnwallet.NewUnilateralCloseSummary(c.chanState,
c.signer, c.pCache, commitSpend, remoteCommit,
)
if err != nil {
return err
}
// As we've detected that the channel has been closed, immediately
// delete the state from disk, creating a close summary for future
// usage by related sub-systems.
err = c.chanState.CloseChannel(&uniClose.ChannelCloseSummary)
if err != nil {
return fmt.Errorf("unable to delete channel state: %v", err)
}
// With the event processed, we'll now notify all subscribers of the
// event.
c.Lock()
for _, sub := range c.clientSubscriptions {
// TODO(roasbeef): send msg before writing to disk
// * need to ensure proper fault tolerance in all cases
// * get ACK from the consumer of the ntfn before writing to disk?
// * no harm in repeated ntfns: at least once semantics
select {
case sub.UnilateralClosure <- uniClose:
case <-c.quit:
return fmt.Errorf("exiting")
}
}
c.Unlock()
return nil
}
// dispatchContractBreach processes a detected contract breached by the remote
// party. This method is to be called once we detect that the remote party has
// broadcast a prior revoked commitment state. This method well prepare all the
// materials required to bring the cheater to justice, then notify all
// registered subscribers of this event.
func (c *chainWatcher) dispatchContractBreach(spendEvent *chainntnfs.SpendDetail,
remoteCommit *channeldb.ChannelCommitment) error {
log.Warnf("Remote peer has breached the channel contract for "+
"ChannelPoint(%v). Revoked state #%v was broadcast!!!",
c.chanState.FundingOutpoint, remoteCommit.CommitHeight)
if err := c.chanState.MarkBorked(); err != nil {
return fmt.Errorf("unable to mark channel as borked: %v", err)
}
var (
broadcastStateNum = remoteCommit.CommitHeight
commitTxBroadcast = spendEvent.SpendingTx
spendHeight = uint32(spendEvent.SpendingHeight)
)
// Create a new reach retribution struct which contains all the data
// needed to swiftly bring the cheating peer to justice.
//
// TODO(roasbeef): move to same package
retribution, err := lnwallet.NewBreachRetribution(
c.chanState, broadcastStateNum, commitTxBroadcast,
spendHeight,
)
if err != nil {
return fmt.Errorf("unable to create breach retribution: %v", err)
}
log.Debugf("Punishment breach retribution created: %v",
spew.Sdump(retribution))
// With the event processed, we'll now notify all subscribers of the
// event.
c.Lock()
for _, sub := range c.clientSubscriptions {
select {
case sub.ContractBreach <- retribution:
case <-c.quit:
return fmt.Errorf("quitting")
}
// Wait for the breach arbiter to ACK the handoff before
// marking the channel as pending force closed in channeldb.
select {
case <-sub.ProcessACK:
// Bail if the handoff failed.
if err != nil {
return fmt.Errorf("unable to handoff "+
"retribution info: %v", err)
}
case <-c.quit:
return fmt.Errorf("quitting")
}
}
c.Unlock()
// At this point, we've successfully received an ack for the breach
// close. We now construct and persist the close summary, marking the
// channel as pending force closed.
//
// TODO(roasbeef): instead mark we got all the monies?
settledBalance := remoteCommit.LocalBalance.ToSatoshis()
closeSummary := channeldb.ChannelCloseSummary{
ChanPoint: c.chanState.FundingOutpoint,
ChainHash: c.chanState.ChainHash,
ClosingTXID: *spendEvent.SpenderTxHash,
CloseHeight: spendHeight,
RemotePub: c.chanState.IdentityPub,
Capacity: c.chanState.Capacity,
SettledBalance: settledBalance,
CloseType: channeldb.BreachClose,
IsPending: true,
ShortChanID: c.chanState.ShortChanID,
}
log.Infof("Breached channel=%v marked pending-closed",
c.chanState.FundingOutpoint)
return c.chanState.CloseChannel(&closeSummary)
}