From 8698085e3528e16417b5ddbaba0b4ff6c234b8a1 Mon Sep 17 00:00:00 2001 From: Conner Fromknecht Date: Tue, 25 Jul 2017 20:14:03 -0700 Subject: [PATCH] breacharbiter: reverts retributionInfo naming and realign diffs --- breacharbiter.go | 256 ++++++++++++++++++++++++------------------ breacharbiter_test.go | 8 +- 2 files changed, 148 insertions(+), 116 deletions(-) diff --git a/breacharbiter.go b/breacharbiter.go index 1f6a5f64..040c3afb 100644 --- a/breacharbiter.go +++ b/breacharbiter.go @@ -59,7 +59,7 @@ type breachArbiter struct { // struct to send the necessary information required to punish a // counterparty once a channel breach is detected. Breach observers // use this to communicate with the main contractObserver goroutine. - breachedContracts chan *retribution + breachedContracts chan *retributionInfo // newContracts is a channel which is used by outside subsystems to // notify the breachArbiter of a new contract (a channel) that should @@ -92,7 +92,7 @@ func newBreachArbiter(wallet *lnwallet.LightningWallet, db *channeldb.DB, retributionStore: newRetributionStore(db), breachObservers: make(map[wire.OutPoint]chan struct{}), - breachedContracts: make(chan *retribution), + breachedContracts: make(chan *retributionInfo), newContracts: make(chan *lnwallet.LightningChannel), settledContracts: make(chan *wire.OutPoint), quit: make(chan struct{}), @@ -116,7 +116,7 @@ func (b *breachArbiter) Start() error { // We load any pending retributions from the database. For each retribution // we need to restart the retribution procedure to claim our just reward. - err = b.retributionStore.ForAll(func(ret *retribution) error { + err = b.retributionStore.ForAll(func(ret *retributionInfo) error { // Register for a notification when the breach transaction is confirmed // on chain. breachTXID := &ret.commitHash @@ -366,6 +366,115 @@ out: return } +// exactRetribution is a goroutine which is executed once a contract breach has +// been detected by a breachObserver. This function is responsible for +// punishing a counterparty for violating the channel contract by sweeping ALL +// the lingering funds within the channel into the daemon's wallet. +// +// NOTE: This MUST be run as a goroutine. +func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, + breachInfo *retributionInfo) { + + defer b.wg.Done() + + // TODO(roasbeef): state needs to be checkpointed here + + select { + case _, ok := <-confChan.Confirmed: + // If the second value is !ok, then the channel has been closed + // signifying a daemon shutdown, so we exit. + if !ok { + return + } + + // Otherwise, if this is a real confirmation notification, then + // we fall through to complete our duty. + case <-b.quit: + return + } + + brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ + "revoked funds", breachInfo.commitHash) + + // With the breach transaction confirmed, we now create the justice tx + // which will claim ALL the funds within the channel. + justiceTx, err := b.createJusticeTx(breachInfo) + if err != nil { + brarLog.Errorf("unable to create justice tx: %v", err) + return + } + + brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string { + return spew.Sdump(justiceTx) + })) + + _, currentHeight, err := b.chainIO.GetBestBlock() + if err != nil { + brarLog.Errorf("unable to get current height: %v", err) + return + } + + // Finally, broadcast the transaction, finalizing the channels' + // retribution against the cheating counterparty. + if err := b.wallet.PublishTransaction(justiceTx); err != nil { + brarLog.Errorf("unable to broadcast "+ + "justice tx: %v", err) + return + } + + // As a conclusionary step, we register for a notification to be + // dispatched once the justice tx is confirmed. After confirmation we + // notify the caller that initiated the retribution workflow that the + // deed has been done. + justiceTXID := justiceTx.TxHash() + confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1, + uint32(currentHeight)) + if err != nil { + brarLog.Errorf("unable to register for conf for txid: %v", + justiceTXID) + return + } + + select { + case _, ok := <-confChan.Confirmed: + if !ok { + return + } + + // TODO(roasbeef): factor in HTLCs + revokedFunds := breachInfo.revokedOutput.amt + totalFunds := revokedFunds + breachInfo.selfOutput.amt + + brarLog.Infof("Justice for ChannelPoint(%v) has "+ + "been served, %v revoked funds (%v total) "+ + "have been claimed", breachInfo.chanPoint, + revokedFunds, totalFunds) + + // With the channel closed, mark it in the database as such. + err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint) + if err != nil { + brarLog.Errorf("unable to mark chan as closed: %v", err) + } + + // Justice has been carried out; we can safely delete the retribution + // info from the database. + err = b.retributionStore.Remove(&breachInfo.chanPoint) + if err != nil { + brarLog.Errorf("unable to remove retribution from the db: %v", err) + } + + // TODO(roasbeef): add peer to blacklist? + + // TODO(roasbeef): close other active channels with offending peer + + close(breachInfo.doneChan) + + return + case <-b.quit: + return + } +} + // breachObserver notifies the breachArbiter contract observer goroutine that a // channel's contract has been breached by the prior counterparty. Once // notified the breachArbiter will attempt to sweep ALL funds within the @@ -504,7 +613,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, // Finally, we send the retribution information into the breachArbiter // event loop to deal swift justice. // TODO(roasbeef): populate htlc breaches - b.breachedContracts <- &retribution{ + b.breachedContracts <- &retributionInfo{ commitHash: breachInfo.BreachTransaction.TxHash(), chanPoint: *chanPoint, @@ -523,6 +632,7 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, }, htlcOutputs: []*breachedOutput{}, + doneChan: make(chan struct{}), } case <-b.quit: @@ -530,120 +640,42 @@ func (b *breachArbiter) breachObserver(contract *lnwallet.LightningChannel, } } -// exactRetribution is a goroutine which is executed once a contract breach has -// been detected by a breachObserver. This function is responsible for -// punishing a counterparty for violating the channel contract by sweeping ALL -// the lingering funds within the channel into the daemon's wallet. -// -// NOTE: This MUST be run as a goroutine. -func (b *breachArbiter) exactRetribution(confChan *chainntnfs.ConfirmationEvent, - breachInfo *retribution) { +// breachedOutput contains all the information needed to sweep a breached +// output. A breached output is an output that we are now entitled to due to a +// revoked commitment transaction being broadcast. +type breachedOutput struct { + amt btcutil.Amount + outpoint wire.OutPoint - defer b.wg.Done() + signDescriptor *lnwallet.SignDescriptor + witnessType lnwallet.WitnessType - // TODO(roasbeef): state needs to be checkpointed here + twoStageClaim bool +} - select { - case _, ok := <-confChan.Confirmed: - // If the second value is !ok, then the channel has been closed - // signifying a daemon shutdown, so we exit. - if !ok { - return - } +// retributionInfo encapsulates all the data needed to sweep all the contested +// funds within a channel whose contract has been breached by the prior +// counterparty. This struct is used to create the justice transaction which +// spends all outputs of the commitment transaction into an output controlled +// by the wallet. +type retributionInfo struct { + commitHash chainhash.Hash + chanPoint wire.OutPoint - // Otherwise, if this is a real confirmation notification, then - // we fall through to complete our duty. - case <-b.quit: - return - } + selfOutput *breachedOutput - brarLog.Debugf("Breach transaction %v has been confirmed, sweeping "+ - "revoked funds", breachInfo.commitHash) + revokedOutput *breachedOutput - // With the breach transaction confirmed, we now create the justice tx - // which will claim ALL the funds within the channel. - justiceTx, err := b.createJusticeTx(breachInfo) - if err != nil { - brarLog.Errorf("unable to create justice tx: %v", err) - return - } + htlcOutputs []*breachedOutput - brarLog.Debugf("Broadcasting justice tx: %v", newLogClosure(func() string { - return spew.Sdump(justiceTx) - })) - - _, currentHeight, err := b.chainIO.GetBestBlock() - if err != nil { - brarLog.Errorf("unable to get current height: %v", err) - return - } - - // Finally, broadcast the transaction, finalizing the channels' - // retribution against the cheating counterparty. - if err := b.wallet.PublishTransaction(justiceTx); err != nil { - brarLog.Errorf("unable to broadcast "+ - "justice tx: %v", err) - return - } - - // As a conclusionary step, we register for a notification to be - // dispatched once the justice tx is confirmed. After confirmation we - // notify the caller that initiated the retribution workflow that the - // deed has been done. - justiceTXID := justiceTx.TxHash() - confChan, err = b.notifier.RegisterConfirmationsNtfn(&justiceTXID, 1, - uint32(currentHeight)) - if err != nil { - brarLog.Errorf("unable to register for conf for txid: %v", - justiceTXID) - return - } - - select { - case _, ok := <-confChan.Confirmed: - if !ok { - return - } - - // TODO(roasbeef): factor in HTLCs - revokedFunds := breachInfo.revokedOutput.amt - totalFunds := revokedFunds + breachInfo.selfOutput.amt - - brarLog.Infof("Justice for ChannelPoint(%v) has "+ - "been served, %v revoked funds (%v total) "+ - "have been claimed", breachInfo.chanPoint, - revokedFunds, totalFunds) - - // With the channel closed, mark it in the database as such. - err := b.db.MarkChanFullyClosed(&breachInfo.chanPoint) - if err != nil { - brarLog.Errorf("unable to mark chan as closed: %v", err) - } - - // Justice has been carried out; we can safely delete the retribution - // info from the database. - err = b.retributionStore.Remove(&breachInfo.chanPoint) - if err != nil { - brarLog.Errorf("unable to remove retribution from the db: %v", err) - } - - // TODO(roasbeef): add peer to blacklist? - - // TODO(roasbeef): close other active channels with offending peer - - close(breachInfo.doneChan) - - return - case <-b.quit: - return - } + doneChan chan struct{} } // createJusticeTx creates a transaction which exacts "justice" by sweeping ALL // the funds within the channel which we are now entitled to due to a breach of // the channel's contract by the counterparty. This function returns a *fully* // signed transaction with the witness for each input fully in place. -func (b *breachArbiter) createJusticeTx(r *retribution) (*wire.MsgTx, error) { +func (b *breachArbiter) createJusticeTx(r *retributionInfo) (*wire.MsgTx, error) { // First, we obtain a new public key script from the wallet which we'll // sweep the funds to. @@ -784,7 +816,7 @@ type breachedOutput struct { // counterparty. This struct is used to create the justice transaction which // spends all outputs of the commitment transaction into an output controlled // by the wallet. -type retribution struct { +type retributionInfo struct { commitHash chainhash.Hash chanPoint wire.OutPoint @@ -792,7 +824,7 @@ type retribution struct { revokedOutput *breachedOutput htlcOutputs []*breachedOutput - doneChan chan struct{} + doneChan chan struct{} } // retributionStore handles persistence of retribution states to disk and is @@ -812,7 +844,7 @@ func newRetributionStore(db *channeldb.DB) *retributionStore { // Add adds a retribution state to the retributionStore, which is then persisted // to disk. -func (rs *retributionStore) Add(ret *retribution) error { +func (rs *retributionStore) Add(ret *retributionInfo) error { return rs.db.Update(func(tx *bolt.Tx) error { // If this is our first contract breach, the retributionBucket won't // exist, in which case, we just create a new bucket. @@ -867,7 +899,7 @@ func (rs *retributionStore) Remove(key *wire.OutPoint) error { // ForAll iterates through all stored retributions and executes the passed // callback function on each retribution. -func (rs *retributionStore) ForAll(cb func(*retribution) error) error { +func (rs *retributionStore) ForAll(cb func(*retributionInfo) error) error { return rs.db.View(func(tx *bolt.Tx) error { // If the bucket does not exist, then there are no pending retributions. retBucket := tx.Bucket(retributionBucket) @@ -878,7 +910,7 @@ func (rs *retributionStore) ForAll(cb func(*retribution) error) error { // Otherwise, we fetch each serialized retribution info, deserialize // it, and execute the passed in callback function on it. return retBucket.ForEach(func(outBytes, retBytes []byte) error { - ret := &retribution{} + ret := &retributionInfo{} if err := ret.Decode(bytes.NewBuffer(retBytes)); err != nil { return err } @@ -889,7 +921,7 @@ func (rs *retributionStore) ForAll(cb func(*retribution) error) error { } // Encode serializes the retribution into the passed byte stream. -func (ret *retribution) Encode(w io.Writer) error { +func (ret *retributionInfo) Encode(w io.Writer) error { if _, err := w.Write(ret.commitHash[:]); err != nil { return err } @@ -921,7 +953,7 @@ func (ret *retribution) Encode(w io.Writer) error { } // Dencode deserializes a retribution from the passed byte stream. -func (ret *retribution) Decode(r io.Reader) error { +func (ret *retributionInfo) Decode(r io.Reader) error { var scratch [32]byte if _, err := io.ReadFull(r, scratch[:]); err != nil { diff --git a/breacharbiter_test.go b/breacharbiter_test.go index 4ab15193..d2732d8f 100644 --- a/breacharbiter_test.go +++ b/breacharbiter_test.go @@ -191,7 +191,7 @@ var ( }, } - retributions = []retribution{ + retributions = []retributionInfo{ { commitHash: [chainhash.HashSize]byte{ 0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab, @@ -284,7 +284,7 @@ func TestRetributionSerialization(t *testing.T) { t.Fatalf("unable to serialize retribution [%v]: %v", i, err) } - desRet := &retribution{} + desRet := &retributionInfo{} if err := desRet.Decode(&buf); err != nil { t.Fatalf("unable to deserialize retribution [%v]: %v", i, err) } @@ -330,7 +330,7 @@ func makeTestDB() (*channeldb.DB, func(), error) { func countRetributions(t *testing.T, rs *retributionStore) int { count := 0 - err := rs.ForAll(func(_ *retribution) error { + err := rs.ForAll(func(_ *retributionInfo) error { count++ return nil }) @@ -374,7 +374,7 @@ func TestRetributionStore(t *testing.T) { // Retrieving the retribution states from the store should yield the same // values as the originals. - rs.ForAll(func(ret *retribution) error { + rs.ForAll(func(ret *retributionInfo) error { equal0 := reflect.DeepEqual(ret, &retributions[0]) equal1 := reflect.DeepEqual(ret, &retributions[1]) if !equal0 || !equal1 {