From fb228a0f7d09ca67408e449688dcede6ac885e26 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Mon, 20 Nov 2017 23:56:42 -0800 Subject: [PATCH] breacharbiter: reliable handoff from wallet --- breacharbiter.go | 543 ++++++++++++++++++++++++++--------------------- 1 file changed, 297 insertions(+), 246 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index a94608a1..7f9a5dcb 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -15,33 +15,34 @@ import ( "github.com/lightningnetwork/lnd/htlcswitch" "github.com/lightningnetwork/lnd/lnwallet" "github.com/roasbeef/btcd/blockchain" - "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" "github.com/roasbeef/btcd/txscript" "github.com/roasbeef/btcd/wire" "github.com/roasbeef/btcutil" ) -// retributionBucket stores retribution state on disk between detecting a -// contract breach, broadcasting a justice transaction that sweeps the channel, -// and finally witnessing the justice transaction confirm on the blockchain. It -// is critical that such state is persisted on disk, so that if our node -// restarts at any point during the retribution procedure, we can recover and -// continue from the persisted state. -var retributionBucket = []byte("retribution") +var ( + // retributionBucket stores retribution state on disk between detecting + // a contract breach, broadcasting a justice transaction that sweeps the + // channel, and finally witnessing the justice transaction confirm on + // the blockchain. It is critical that such state is persisted on disk, + // so that if our node restarts at any point during the retribution + // procedure, we can recover and continue from the persisted state. + retributionBucket = []byte("retribution") + + // justiceTxnBucket holds the finalized justice transactions for all + // breached contracts. Entries are added to the justice txn bucket just + // before broadcasting the sweep txn. + justiceTxnBucket = []byte("justice-txn") +) // BreachConfig bundles the required subsystems used by the breach arbiter. An // instance of BreachConfig is passed to newBreachArbiter during instantiation. type BreachConfig struct { - // ChainIO is used by the breach arbiter to determine the current height - // of the blockchain, which is required to subscribe for spend - // notifications from Notifier. - ChainIO lnwallet.BlockChainIO - // CloseLink allows the breach arbiter to shutdown any channel links for // which it detects a breach, ensuring now further activity will - // continue across the link. The method accepts link's channel point and a - // close type to be included in the channel close summary. + // continue across the link. The method accepts link's channel point and + // a close type to be included in the channel close summary. CloseLink func(*wire.OutPoint, htlcswitch.ChannelCloseType) // DB provides access to the user's channels, allowing the breach @@ -85,6 +86,9 @@ type BreachConfig struct { // counterparties. // TODO(roasbeef): closures in config for subsystem pointers to decouple? type breachArbiter struct { + started uint32 + stopped uint32 + cfg *BreachConfig // breachObservers is a map which tracks all the active breach @@ -112,10 +116,8 @@ type breachArbiter struct { // breach closes. settledContracts chan *wire.OutPoint - started uint32 - stopped uint32 - quit chan struct{} - wg sync.WaitGroup + quit chan struct{} + wg sync.WaitGroup } // newBreachArbiter creates a new instance of a breachArbiter initialized with @@ -141,40 +143,52 @@ func (b *breachArbiter) Start() error { brarLog.Tracef("Starting breach arbiter") - // We load all pending retributions from the database and - // deterministically reconstruct a channel close summary for each. In - // the event that a channel is still open after being breached, we can - // use the close summary to reinitiate a channel close so that the - // breach is reflected in channeldb. + // Load all retributions currently persisted in the retribution store. breachRetInfos := make(map[wire.OutPoint]retributionInfo) - closeSummaries := make(map[wire.OutPoint]channeldb.ChannelCloseSummary) - err := b.cfg.Store.ForAll(func(ret *retributionInfo) error { - // Extract emitted retribution information. + if err := b.cfg.Store.ForAll(func(ret *retributionInfo) error { breachRetInfos[ret.chanPoint] = *ret - - // Deterministically reconstruct channel close summary from - // persisted retribution information and record in breach close - // summaries map under the corresponding channel point. - closeSummary := channeldb.ChannelCloseSummary{ - ChanPoint: ret.chanPoint, - ClosingTXID: ret.commitHash, - RemotePub: ret.remoteIdentity, - Capacity: ret.capacity, - SettledBalance: ret.settledBalance, - CloseType: channeldb.BreachClose, - IsPending: true, - } - closeSummaries[ret.chanPoint] = closeSummary - return nil - }) - if err != nil { + }); err != nil { return err } + // Load all currently closed channels from disk, we will use the + // channels that have been marked fully closed to filter the retribution + // information loaded from disk. This is necessary in the event that the + // channel was marked fully closed, but was not removed from the + // retribution store. + closedChans, err := b.cfg.DB.FetchClosedChannels(false) + if err != nil { + brarLog.Errorf("unable to fetch closing channels: %v", err) + return err + } + + // Using the set of non-pending, closed channels, reconcile any + // discrepancies between the channeldb and the retribution store by + // removing any retribution information for which we have already + // finished our responsibilities. If the removal is successful, we also + // remove the entry from our in-memory map, to avoid any further action + // for this channel. + for _, chanSummary := range closedChans { + if chanSummary.IsPending { + continue + } + + chanPoint := &chanSummary.ChanPoint + if _, ok := breachRetInfos[*chanPoint]; ok { + if err := b.cfg.Store.Remove(chanPoint); err != nil { + brarLog.Errorf("unable to remove closed "+ + "chanid=%v from breach arbiter: %v", + chanPoint, err) + return err + } + delete(breachRetInfos, *chanPoint) + } + } + // We need to query that database state for all currently active - // channels, each of these channels will need a goroutine assigned to - // it to watch for channel breaches. + // channels, these channels will represent a super set of all channels + // that may be assigned a go routine to monitor for channel breaches. activeChannels, err := b.cfg.DB.FetchAllChannels() if err != nil && err != channeldb.ErrNoActiveChannels { brarLog.Errorf("unable to fetch active channels: %v", err) @@ -188,29 +202,20 @@ func (b *breachArbiter) Start() error { } // Here we will determine a set of channels that will need to be managed - // by the contractObserver. For each of the open channels read from - // disk, we will create a channel state machine that can be used to - // watch for any potential channel closures. We must first exclude any - // channel whose retribution process has been initiated, and proceed to - // mark them as closed. The state machines generated for these filtered - // channels can be discarded, as their fate will be placed in the hands - // of an exactRetribution task spawned later. - // - // NOTE: Spawning of the exactRetribution task is intentionally - // postponed until after this step in order to ensure that the all - // breached channels are reflected as closed in channeldb and consistent - // with what is checkpointed by the breach arbiter. Instead of treating - // the breached-and-closed and breached-but-still-active channels as - // separate sets of channels, we first ensure that all - // breached-but-still-active channels are promoted to - // breached-and-closed during restart, allowing us to treat them as a - // single set from here on out. This approach also has the added benefit - // of minimizing the likelihood that the wrong number of tasks are - // spawned per breached channel, and prevents us from being in a - // position where retribution has completed but the channel is still - // marked as open in channeldb. + // by the contractObserver. This should comprise all active channels + // that have not been breached. If the channel point has an entry in the + // retribution store, we skip it to avoid creating a breach observer. + // Resolving breached channels will be handled later by spawning an + // exactRetribution task for each. channelsToWatch := make([]*lnwallet.LightningChannel, 0, nActive) for _, chanState := range activeChannels { + // If this channel was previously breached, we skip it here to + // avoid creating a breach observer, as we can go straight to + // the task of exacting retribution. + if _, ok := breachRetInfos[chanState.FundingOutpoint]; ok { + continue + } + // Initialize active channel from persisted channel state. channel, err := lnwallet.NewLightningChannel(nil, b.cfg.Notifier, b.cfg.Estimator, chanState) @@ -220,62 +225,28 @@ func (b *breachArbiter) Start() error { return err } - // Before marking this as an active channel that the breach - // arbiter should watch, check to see if this channel was - // previously breached. If so, we attempt to reflect this in the - // channeldb by closing the channel. Upon success, we continue - // because the channel is no longer open, and thus does not need - // to be managed by the contractObserver. - chanPoint := chanState.FundingOutpoint - if closeSummary, ok := closeSummaries[chanPoint]; ok { - // Since this channel should not be open, we - // immediately notify the HTLC switch that this link - // should be closed, and that all activity on the link - // should cease. - b.cfg.CloseLink(&chanState.FundingOutpoint, - htlcswitch.CloseBreach) - - // Ensure channeldb is consistent with the persisted - // breach. - err := channel.DeleteState(&closeSummary) - if err != nil { - brarLog.Errorf("unable to delete channel "+ - "state: %v", err) - return err - } - - // Now that this channel is both breached _and_ closed, - // we can skip adding it to the `channelsToWatch` since - // we can begin the retribution process immediately. - continue - } - // Finally, add this channel to breach arbiter's list of // channels to watch. channelsToWatch = append(channelsToWatch, channel) } - // TODO(roasbeef): instead use closure height of channel - _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() - if err != nil { - return err - } - // Additionally, we'll also want to watch any pending close or force // close transactions so we can properly mark them as resolved in the // database. - if err := b.watchForPendingCloseConfs(currentHeight); err != nil { + if err := b.watchForPendingCloseConfs(); err != nil { return err } // Spawn the exactRetribution tasks to monitor and resolve any breaches // that were loaded from the retribution store. - for chanPoint, closeSummary := range closeSummaries { + for chanPoint := range breachRetInfos { + retInfo := breachRetInfos[chanPoint] + // Register for a notification when the breach transaction is // confirmed on chain. - breachTXID := closeSummary.ClosingTXID + breachTXID := retInfo.commitHash confChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - &breachTXID, 1, uint32(currentHeight)) + &breachTXID, 1, retInfo.breachHeight) if err != nil { brarLog.Errorf("unable to register for conf updates "+ "for txid: %v, err: %v", breachTXID, err) @@ -284,7 +255,6 @@ func (b *breachArbiter) Start() error { // Launch a new goroutine which to finalize the channel // retribution after the breach transaction confirms. - retInfo := breachRetInfos[chanPoint] b.wg.Add(1) go b.exactRetribution(confChan, &retInfo) } @@ -298,12 +268,13 @@ func (b *breachArbiter) Start() error { // watchForPendingCloseConfs dispatches confirmation notification subscribers // that mark any pending channels as fully closed when signaled. -func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error { +func (b *breachArbiter) watchForPendingCloseConfs() error { pendingCloseChans, err := b.cfg.DB.FetchClosedChannels(true) if err != nil { brarLog.Errorf("unable to fetch closing channels: %v", err) return err } + for _, pendingClose := range pendingCloseChans { // If this channel was force closed, and we have a non-zero // time-locked balance, then the utxoNursery is currently @@ -319,7 +290,7 @@ func (b *breachArbiter) watchForPendingCloseConfs(currentHeight int32) error { closeTXID := pendingClose.ClosingTXID confNtfn, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - &closeTXID, 1, uint32(currentHeight)) + &closeTXID, 1, pendingClose.CloseHeight) if err != nil { return err } @@ -376,6 +347,12 @@ func (b *breachArbiter) Stop() error { return nil } +// IsBreached queries the breach arbiter's retribution store to see if it is +// aware of any channel breaches for a particular channel point. +func (b *breachArbiter) IsBreached(chanPoint *wire.OutPoint) (bool, error) { + return b.cfg.Store.IsBreached(chanPoint) +} + // contractObserver is the primary goroutine for the breachArbiter. This // goroutine is responsible for managing goroutines that watch for breaches for // all current active and newly created channels. If a channel breach is @@ -389,13 +366,16 @@ func (b *breachArbiter) contractObserver( defer b.wg.Done() + brarLog.Infof("Starting contract observer with %v active channels", + len(activeChannels)) + // For each active channel found within the database, we launch a // detected breachObserver goroutine for that channel and also track // the new goroutine within the breachObservers map so we can cancel it // later if necessary. for _, channel := range activeChannels { settleSignal := make(chan struct{}) - chanPoint := channel.ChannelPoint() + chanPoint := channel.ChanPoint b.breachObservers[*chanPoint] = settleSignal b.wg.Add(1) @@ -409,12 +389,6 @@ out: for { select { case breachInfo := <-b.breachedContracts: - _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() - if err != nil { - brarLog.Errorf("unable to get best height: %v", - err) - } - // A new channel contract has just been breached! We // first register for a notification to be dispatched // once the breach transaction (the revoked commitment @@ -422,7 +396,7 @@ out: // ensure we're not dealing with a moving target. breachTXID := &breachInfo.commitHash cfChan, err := b.cfg.Notifier.RegisterConfirmationsNtfn( - breachTXID, 1, uint32(currentHeight)) + breachTXID, 1, breachInfo.breachHeight) if err != nil { brarLog.Errorf("unable to register for conf "+ "updates for txid: %v, err: %v", @@ -449,7 +423,7 @@ out: // daemon, so we launch a new breachObserver to handle // the detection of attempted contract breaches. settleSignal := make(chan struct{}) - chanPoint := contract.ChannelPoint() + chanPoint := contract.ChanPoint // If the contract is already being watched, then an // additional send indicates we have a stale version of @@ -516,14 +490,17 @@ func (b *breachArbiter) exactRetribution( // TODO(roasbeef): state needs to be checkpointed here + var breachConfHeight uint32 select { - case _, ok := <-confChan.Confirmed: + case breachConf, ok := <-confChan.Confirmed: // If the second value is !ok, then the channel has been closed // signifying a daemon shutdown, so we exit. if !ok { return } + breachConfHeight = breachConf.BlockHeight + // Otherwise, if this is a real confirmation notification, then // we fall through to complete our duty. case <-b.quit: @@ -533,40 +510,55 @@ func (b *breachArbiter) exactRetribution( brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ "revoked funds", breachInfo.commitHash) - // With the breach transaction confirmed, we now create the justice tx - // which will claim ALL the funds within the channel. - justiceTx, err := b.createJusticeTx(breachInfo) + finalTx, err := b.cfg.Store.GetFinalizedTxn(&breachInfo.chanPoint) if err != nil { - brarLog.Errorf("unable to create justice tx: %v", err) + brarLog.Errorf("unable to get finalized txn for"+ + "chanid=%v: %v", &breachInfo.chanPoint, err) return } + // If this retribution has not been finalized before, we will first + // construct a sweep transaction and write it to disk. This will allow + // the breach arbiter to re-register for notifications for the justice + // txid. + if finalTx == nil { + // With the breach transaction confirmed, we now create the + // justice tx which will claim ALL the funds within the channel. + finalTx, err = b.createJusticeTx(breachInfo) + if err != nil { + brarLog.Errorf("unable to create justice tx: %v", err) + return + } + + // Persist our finalized justice transaction before making an + // attempt to broadcast. + err := b.cfg.Store.Finalize(&breachInfo.chanPoint, finalTx) + if err != nil { + brarLog.Errorf("unable to finalize justice tx for "+ + "chanid=%v: %v", &breachInfo.chanPoint, err) + return + } + } + brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string { - return spew.Sdump(justiceTx) + return spew.Sdump(finalTx) })) - _, currentHeight, err := b.cfg.ChainIO.GetBestBlock() - if err != nil { - brarLog.Errorf("unable to get current height: %v", err) - return - } - // Finally, broadcast the transaction, finalizing the channels' // retribution against the cheating counterparty. - if err := b.cfg.PublishTransaction(justiceTx); err != nil { + if err := b.cfg.PublishTransaction(finalTx); err != nil { brarLog.Errorf("unable to broadcast "+ "justice tx: %v", err) - return } // As a conclusionary step, we register for a notification to be // dispatched once the justice tx is confirmed. After confirmation we // notify the caller that initiated the retribution workflow that the // deed has been done. - justiceTXID := justiceTx.TxHash() + justiceTXID := finalTx.TxHash() confChan, err = b.cfg.Notifier.RegisterConfirmationsNtfn( - &justiceTXID, 1, uint32(currentHeight)) + &justiceTXID, 1, breachConfHeight) if err != nil { brarLog.Errorf("unable to register for conf for txid: %v", justiceTXID) @@ -607,6 +599,7 @@ func (b *breachArbiter) exactRetribution( err := b.cfg.DB.MarkChanFullyClosed(&breachInfo.chanPoint) if err != nil { brarLog.Errorf("unable to mark chan as closed: %v", err) + return } // Justice has been carried out; we can safely delete the @@ -640,7 +633,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, defer b.wg.Done() - chanPoint := contract.ChannelPoint() + chanPoint := contract.ChanPoint brarLog.Debugf("Breach observer for ChannelPoint(%v) started ", chanPoint) @@ -746,57 +739,39 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // mid-local initiated state-transition, possible // false-positive? - // Obtain a snapshot of the final channel state, which can be - // used to reclose a breached channel in the event of a failure. - chanInfo := contract.StateSnapshot() - // Using the breach information provided by the wallet and the // channel snapshot, construct the retribution information that // will be persisted to disk. - retInfo := newRetributionInfo(chanPoint, breachInfo, chanInfo) + retInfo := newRetributionInfo(chanPoint, breachInfo) // Persist the pending retribution state to disk. - if err := b.cfg.Store.Add(retInfo); err != nil { - brarLog.Errorf("unable to persist retribution info "+ - "to db: %v", err) + err := b.cfg.Store.Add(retInfo) + if err != nil { + brarLog.Errorf("unable to persist retribution "+ + "info to db: %v", err) } - // TODO(conner): move responsibility of channel closure into - // lnwallet. Have breach arbiter ACK after writing to disk, then - // have wallet mark channel as closed. This allows the wallet to - // attempt to retransmit the breach info if the either arbiter - // or the wallet goes down before completing the hand off. + // Now that the breach has been persisted, try to send an + // acknowledgment back to the close observer with the error. If + // the ack is successful, the close observer will mark the + // channel as pending-closed in the channeldb. + select { + case breachInfo.Err <- err: + // Bail if we failed to persist retribution info. + if err != nil { + return + } - // Now that the breach arbiter has persisted the information, - // we can go ahead and mark the channel as closed in the - // channeldb. This step is done after persisting the - // retribution information so that a failure between these steps - // will cause an attempt to monitor the still-open channel. - // However, since the retribution information was persisted - // before, the arbiter will recognize that the channel should be - // closed, and proceed to mark it as such after a restart, and - // forgo monitoring it for breaches. + case <-contract.ObserverQuit(): + // If the close observer has already exited, it will + // never read the acknowledgment, so we exit. + return - // Construct the breached channel's close summary marking the - // channel using the snapshot from before, and marking this as a - // BreachClose. - closeInfo := &channeldb.ChannelCloseSummary{ - ChanPoint: *chanPoint, - ChainHash: breachInfo.ChainHash, - ClosingTXID: breachInfo.BreachTransaction.TxHash(), - RemotePub: &chanInfo.RemoteIdentity, - Capacity: chanInfo.Capacity, - SettledBalance: chanInfo.LocalBalance.ToSatoshis(), - CloseType: channeldb.BreachClose, - IsPending: true, - } - - // Next, persist the channel close to disk. Upon restart, the - // arbiter will recognize that this channel has been breached - // and marked close, and fast track its path to justice. - if err := contract.DeleteState(closeInfo); err != nil { - brarLog.Errorf("unable to delete channel state: %v", - err) + case <-b.quit: + // Cancel the close observer if the breach arbiter is + // shutting down, dropping the acknowledgment. + contract.CancelObserver() + return } // Finally, we send the retribution information into the @@ -807,6 +782,8 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, } case <-b.quit: + contract.Stop() + contract.CancelObserver() return } } @@ -922,20 +899,10 @@ var _ SpendableOutput = (*breachedOutput)(nil) // spends all outputs of the commitment transaction into an output controlled // by the wallet. type retributionInfo struct { - commitHash chainhash.Hash - chanPoint wire.OutPoint - chainHash chainhash.Hash - - // TODO(conner): remove the following group of fields after decoupling - // the breach arbiter from the wallet. - - // Fields copied from channel snapshot when a breach is detected. This - // is necessary for deterministically constructing the channel close - // summary in the event that the breach arbiter crashes before closing - // the channel. - remoteIdentity *btcec.PublicKey - capacity btcutil.Amount - settledBalance btcutil.Amount + commitHash chainhash.Hash + chanPoint wire.OutPoint + chainHash chainhash.Hash + breachHeight uint32 breachedOutputs []breachedOutput } @@ -945,8 +912,7 @@ type retributionInfo struct { // channels. The information is primarily populated using the BreachRetribution // delivered by the wallet when it detects a channel breach. func newRetributionInfo(chanPoint *wire.OutPoint, - breachInfo *lnwallet.BreachRetribution, - chanInfo *channeldb.ChannelSnapshot) *retributionInfo { + breachInfo *lnwallet.BreachRetribution) *retributionInfo { // Determine the number of second layer HTLCs we will attempt to sweep. nHtlcs := len(breachInfo.HtlcRetributions) @@ -1009,16 +975,10 @@ func newRetributionInfo(chanPoint *wire.OutPoint, breachedOutputs = append(breachedOutputs, htlcOutput) } - // TODO(conner): remove dependency on channel snapshot after decoupling - // channel closure from the breach arbiter. - return &retributionInfo{ commitHash: breachInfo.BreachTransaction.TxHash(), - chainHash: chanInfo.ChainHash, + chainHash: breachInfo.ChainHash, chanPoint: *chanPoint, - remoteIdentity: &chanInfo.RemoteIdentity, - capacity: chanInfo.Capacity, - settledBalance: chanInfo.LocalBalance.ToSatoshis(), breachedOutputs: breachedOutputs, } } @@ -1226,6 +1186,19 @@ type RetributionStore interface { // the addition fails. Add(retInfo *retributionInfo) error + // IsBreached queries the retribution store to see if the breach arbiter + // is aware of any breaches for the provided channel point. + IsBreached(chanPoint *wire.OutPoint) (bool, error) + + // Finalize persists the finalized justice transaction for a particular + // channel. + Finalize(chanPoint *wire.OutPoint, finalTx *wire.MsgTx) error + + // GetFinalizedTxn loads the finalized justice transaction, if any, from + // the retribution store. The finalized transaction will be nil if + // Finalize has not yet been called for this channel point. + GetFinalizedTxn(chanPoint *wire.OutPoint) (*wire.MsgTx, error) + // Remove deletes the retributionInfo from disk, if any exists, under // the given key. An error should be re raised if the removal fails. Remove(key *wire.OutPoint) error @@ -1276,8 +1249,97 @@ func (rs *retributionStore) Add(ret *retributionInfo) error { }) } -// Remove removes a retribution state from the retributionStore database. -func (rs *retributionStore) Remove(key *wire.OutPoint) error { +// Finalize writes a signed justice transaction to the retribution store. This +// is done before publishing the transaction, so that we can recover the txid on +// startup and re-register for confirmation notifications. +func (rs *retributionStore) Finalize(chanPoint *wire.OutPoint, + finalTx *wire.MsgTx) error { + return rs.db.Update(func(tx *bolt.Tx) error { + justiceBkt, err := tx.CreateBucketIfNotExists(justiceTxnBucket) + if err != nil { + return err + } + + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + + var txBuf bytes.Buffer + if err := finalTx.Serialize(&txBuf); err != nil { + return err + } + + return justiceBkt.Put(chanBuf.Bytes(), txBuf.Bytes()) + }) +} + +// GetFinalizedTxn loads the finalized justice transaction for the provided +// channel point. The finalized transaction will be nil if Finalize has yet to +// be called for this channel point. +func (rs *retributionStore) GetFinalizedTxn( + chanPoint *wire.OutPoint) (*wire.MsgTx, error) { + + var finalTxBytes []byte + if err := rs.db.View(func(tx *bolt.Tx) error { + justiceBkt := tx.Bucket(justiceTxnBucket) + if justiceBkt == nil { + return nil + } + + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + + finalTxBytes = justiceBkt.Get(chanBuf.Bytes()) + + return nil + }); err != nil { + return nil, err + } + + if finalTxBytes == nil { + return nil, nil + } + + finalTx := &wire.MsgTx{} + err := finalTx.Deserialize(bytes.NewReader(finalTxBytes)) + + return finalTx, err +} + +// IsBreached queries the retribution store to discern if this channel was +// previously breached. This is used when connecting to a peer to determine if +// it is safe to add a link to the htlcswitch, as we should never add a channel +// that has already been breached. +func (rs *retributionStore) IsBreached(chanPoint *wire.OutPoint) (bool, error) { + var found bool + err := rs.db.View(func(tx *bolt.Tx) error { + retBucket := tx.Bucket(retributionBucket) + if retBucket == nil { + return nil + } + + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + + retInfo := retBucket.Get(chanBuf.Bytes()) + if retInfo != nil { + found = true + } + + return nil + }) + + return found, err +} + +// Remove removes a retribution state and finalized justice transaction by +// channel point from the retribution store. +func (rs *retributionStore) Remove(chanPoint *wire.OutPoint) error { return rs.db.Update(func(tx *bolt.Tx) error { retBucket := tx.Bucket(retributionBucket) @@ -1287,15 +1349,30 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error { // stored in the db. if retBucket == nil { return errors.New("unable to remove retribution " + - "because the db bucket doesn't exist.") + "because the retribution bucket doesn't exist.") } - var outBuf bytes.Buffer - if err := writeOutpoint(&outBuf, key); err != nil { + // Serialize the channel point we are intending to remove. + var chanBuf bytes.Buffer + if err := writeOutpoint(&chanBuf, chanPoint); err != nil { + return err + } + chanBytes := chanBuf.Bytes() + + // Remove the persisted retribution info and finalized justice + // transaction. + if err := retBucket.Delete(chanBytes); err != nil { return err } - return retBucket.Delete(outBuf.Bytes()) + // If we have not finalized this channel breach, we can exit + // early. + justiceBkt := tx.Bucket(justiceTxnBucket) + if justiceBkt == nil { + return nil + } + + return justiceBkt.Delete(chanBytes) }) } @@ -1313,11 +1390,10 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { // Otherwise, we fetch each serialized retribution info, // deserialize it, and execute the passed in callback function // on it. - return retBucket.ForEach(func(outBytes, retBytes []byte) error { + return retBucket.ForEach(func(_, retBytes []byte) error { ret := &retributionInfo{} - if err := ret.Decode( - bytes.NewBuffer(retBytes), - ); err != nil { + err := ret.Decode(bytes.NewBuffer(retBytes)) + if err != nil { return err } @@ -1328,7 +1404,7 @@ func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { // Encode serializes the retribution into the passed byte stream. func (ret *retributionInfo) Encode(w io.Writer) error { - var scratch [8]byte + var scratch [4]byte if _, err := w.Write(ret.commitHash[:]); err != nil { return err @@ -1342,18 +1418,8 @@ func (ret *retributionInfo) Encode(w io.Writer) error { return err } - if _, err := w.Write( - ret.remoteIdentity.SerializeCompressed()); err != nil { - return err - } - - binary.BigEndian.PutUint64(scratch[:8], uint64(ret.capacity)) - if _, err := w.Write(scratch[:8]); err != nil { - return err - } - - binary.BigEndian.PutUint64(scratch[:8], uint64(ret.settledBalance)) - if _, err := w.Write(scratch[:8]); err != nil { + binary.BigEndian.PutUint32(scratch[:], ret.breachHeight) + if _, err := w.Write(scratch[:]); err != nil { return err } @@ -1373,12 +1439,12 @@ func (ret *retributionInfo) Encode(w io.Writer) error { // Dencode deserializes a retribution from the passed byte stream. func (ret *retributionInfo) Decode(r io.Reader) error { - var scratch [33]byte + var scratch [32]byte - if _, err := io.ReadFull(r, scratch[:32]); err != nil { + if _, err := io.ReadFull(r, scratch[:]); err != nil { return err } - hash, err := chainhash.NewHash(scratch[:32]) + hash, err := chainhash.NewHash(scratch[:]) if err != nil { return err } @@ -1388,34 +1454,19 @@ func (ret *retributionInfo) Decode(r io.Reader) error { return err } - if _, err := io.ReadFull(r, scratch[:32]); err != nil { + if _, err := io.ReadFull(r, scratch[:]); err != nil { return err } - chainHash, err := chainhash.NewHash(scratch[:32]) + chainHash, err := chainhash.NewHash(scratch[:]) if err != nil { return err } ret.chainHash = *chainHash - if _, err = io.ReadFull(r, scratch[:33]); err != nil { + if _, err := io.ReadFull(r, scratch[:4]); err != nil { return err } - remoteIdentity, err := btcec.ParsePubKey(scratch[:33], btcec.S256()) - if err != nil { - return err - } - ret.remoteIdentity = remoteIdentity - - if _, err := io.ReadFull(r, scratch[:8]); err != nil { - return err - } - ret.capacity = btcutil.Amount(binary.BigEndian.Uint64(scratch[:8])) - - if _, err := io.ReadFull(r, scratch[:8]); err != nil { - return err - } - ret.settledBalance = btcutil.Amount( - binary.BigEndian.Uint64(scratch[:8])) + ret.breachHeight = binary.BigEndian.Uint32(scratch[:4]) nOutputsU64, err := wire.ReadVarInt(r, 0) if err != nil {