lnwallet: remove the closeObserver from the channel state machine

In this PR, we entirely remove the closeObserver from the channel state
machine. It was added very early on before most of the other aspects of
the daemon were built out. This goroutine was responsible for
dispatching notifications to outside parties if the commitment
transaction was spent at all. This had several issues, since it was
linked to the *lifetime* of the channel state machine itself. As a
result of this linkage, we had to do weird stuff like hand off in
memory pointers to the state machine in order to ensure notifications
were properly dispatched.
This commit is contained in:
Olaoluwa Osuntokun 2018-01-18 13:45:30 -08:00
parent b391049e49
commit 30c4196f91
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
5 changed files with 141 additions and 738 deletions

File diff suppressed because it is too large Load Diff

View File

@ -3,16 +3,13 @@ package lnwallet
import (
"bytes"
"crypto/sha256"
"errors"
"io/ioutil"
"math/big"
"math/rand"
"os"
"reflect"
"runtime"
"sync"
"testing"
"time"
"github.com/davecgh/go-spew/spew"
"github.com/lightningnetwork/lnd/chainntnfs"
@ -59,51 +56,6 @@ var (
numReqConfs = uint16(1)
)
// mockSpendNotifier extends the mockNotifier so that spend notifications can be
// triggered and delivered to subscribers.
type mockSpendNotifier struct {
*mockNotfier
spendMap map[wire.OutPoint][]chan *chainntnfs.SpendDetail
}
func makeMockSpendNotifier() *mockSpendNotifier {
return &mockSpendNotifier{
spendMap: make(map[wire.OutPoint][]chan *chainntnfs.SpendDetail),
}
}
func (m *mockSpendNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
spendChan := make(chan *chainntnfs.SpendDetail, 1)
m.spendMap[*outpoint] = append(m.spendMap[*outpoint], spendChan)
return &chainntnfs.SpendEvent{
Spend: spendChan,
Cancel: func() {
},
}, nil
}
// Spend dispatches SpendDetails to all subscribers of the outpoint. The details
// will include the transaction and height provided by the caller.
func (m *mockSpendNotifier) Spend(outpoint *wire.OutPoint, height int32,
txn *wire.MsgTx) {
if spendChans, ok := m.spendMap[*outpoint]; ok {
delete(m.spendMap, *outpoint)
for _, spendChan := range spendChans {
txnHash := txn.TxHash()
spendChan <- &chainntnfs.SpendDetail{
SpentOutPoint: outpoint,
SpendingHeight: height,
SpendingTx: txn,
SpenderTxHash: &txnHash,
SpenderInputIndex: outpoint.Index,
}
}
}
}
// initRevocationWindows simulates a new channel being opened within the p2p
// network by populating the initial revocation windows of the passed
// commitment state machines.
@ -168,36 +120,12 @@ func forceStateTransition(chanA, chanB *LightningChannel) error {
return nil
}
// createSpendableTestChannels initializes a pair of channels using a
// mockSpendNotifier. This allows us to test the behavior of the closeObserver,
// which is activated when the funding transaction is spent.
func createSpendableTestChannels(revocationWindow int) (*LightningChannel,
*LightningChannel, *mockSpendNotifier, func(), error) {
notifier := makeMockSpendNotifier()
alice, bob, cleanup, err := createTestChannelsWithNotifier(
revocationWindow, notifier,
)
return alice, bob, notifier, cleanup, err
}
// createTestChannels initializes a pair of channels using a mock notifier.
// createTestChannels creates two test lightning channels using the provided
// notifier. The channel itself is funded with 10 BTC, with 5 BTC allocated to
// each side. Within the channel, Alice is the initiator.
func createTestChannels(revocationWindow int) (*LightningChannel,
*LightningChannel, func(), error) {
notifier := &mockNotfier{}
return createTestChannelsWithNotifier(revocationWindow, notifier)
}
// createTestChannelsWithNotifier creates two test lightning channels using the
// provided notifier. The channel itself is funded with 10 BTC, with 5 BTC
// allocated to each side. Within the channel, Alice is the initiator.
func createTestChannelsWithNotifier(revocationWindow int,
notifier chainntnfs.ChainNotifier) (*LightningChannel,
*LightningChannel, func(), error) {
channelCapacity := btcutil.Amount(10 * 1e8)
channelBal := channelCapacity / 2
aliceDustLimit := btcutil.Amount(200)
@ -364,26 +292,21 @@ func createTestChannelsWithNotifier(revocationWindow int,
aliceSigner := &mockSigner{privkeys: aliceKeys}
bobSigner := &mockSigner{privkeys: bobKeys}
aliceNotifier := &mockNotfier{
activeSpendNtfn: make(chan *chainntnfs.SpendDetail),
}
bobNotifier := &mockNotfier{
activeSpendNtfn: make(chan *chainntnfs.SpendDetail),
}
pCache := &mockPreimageCache{
// hash -> preimage
preimageMap: make(map[[32]byte][]byte),
}
// TODO(roasbeef): make mock version of pre-image store
channelAlice, err := NewLightningChannel(aliceSigner, aliceNotifier,
pCache, aliceChannelState)
channelAlice, err := NewLightningChannel(
aliceSigner, pCache, aliceChannelState,
)
if err != nil {
return nil, nil, nil, err
}
channelBob, err := NewLightningChannel(bobSigner, bobNotifier,
pCache, bobChannelState)
channelBob, err := NewLightningChannel(
bobSigner, pCache, bobChannelState,
)
if err != nil {
return nil, nil, nil, err
}
@ -1273,106 +1196,6 @@ func TestForceCloseDustOutput(t *testing.T) {
}
}
// TestBreachClose checks that the resulting ForceCloseSummary is correct when a
// peer is ForceClosing the channel. Will check outputs both above and below
// the dust limit.
func TestBreachClose(t *testing.T) {
t.Parallel()
// Create a test channel which will be used for the duration of this
// unittest. The channel will be funded evenly with Alice having 5 BTC,
// and Bob having 5 BTC.
aliceChannel, bobChannel, notifier, cleanUp, err :=
createSpendableTestChannels(1)
if err != nil {
t.Fatalf("unable to create test channels: %v", err)
}
defer cleanUp()
// Send one HTLC from Alice to Bob, and advance the state of both
// channels.
htlcAmount := lnwire.NewMSatFromSatoshis(20000)
htlc, _ := createHTLC(0, htlcAmount)
if _, err := aliceChannel.AddHTLC(htlc); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bobChannel.ReceiveHTLC(htlc); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(aliceChannel, bobChannel); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
// Construct a force close summary of Bob's channel, this includes the
// breach transaction that will be used to spend the funding point.
forceCloseSummary, err := bobChannel.ForceClose()
if err != nil {
t.Fatalf("unable to force close bob's channel: %v", err)
}
// Send another HTLC and advance the state of both channels again. This
// ensures that Alice's state will be ahead of the breach transaction
// generated above.
htlc2, _ := createHTLC(1, htlcAmount)
if _, err := aliceChannel.AddHTLC(htlc2); err != nil {
t.Fatalf("alice unable to add htlc: %v", err)
}
if _, err := bobChannel.ReceiveHTLC(htlc2); err != nil {
t.Fatalf("bob unable to recv add htlc: %v", err)
}
if err := forceStateTransition(aliceChannel, bobChannel); err != nil {
t.Fatalf("Can't update the channel state: %v", err)
}
chanPoint := aliceChannel.ChanPoint
breachTxn := forceCloseSummary.CloseTx
// Spend the funding point using the breach transaction.
notifier.Spend(chanPoint, 100, breachTxn)
// Set up a separate routine to monitor alice's channel for a response
// to the spend. We use a generous timeout to ensure the test doesn't
// stall indefinitely, but allows us to block the main routine until the
// close observer exits.
errChan := make(chan error, 1)
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
select {
case ret := <-aliceChannel.ContractBreach:
errChan <- nil
// Acknowledge a successful processing of the
// retribution information.
ret.Err <- nil
case <-aliceChannel.UnilateralClose:
errChan <- errors.New("expected breach close to " +
"be signaled, not unilateral")
case <-time.After(60 * time.Second):
errChan <- errors.New("breach was not signaled")
}
}()
// Wait for both the close observer to exit and our background process
// to exit before attempting to read from the error channel.
aliceChannel.WaitForClose()
wg.Wait()
// Now that all tasks have been shutdown, handle the result. The result
// should be available immediately, we allow five seconds to handle any
// variance in scheduling on travis.
select {
case err := <-errChan:
if err != nil {
t.Fatalf(err.Error())
}
case <-time.After(5 * time.Second):
t.Fatalf("breach was not received")
}
}
// TestDustHTLCFees checks that fees are calculated correctly when HTLCs fall
// below the nodes' dust limit. In these cases, the amount of the dust HTLCs
// should be applied to the commitment transaction fee.
@ -1704,15 +1527,14 @@ func TestStateUpdatePersistence(t *testing.T) {
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
notifier := aliceChannel.channelEvents
aliceChannelNew, err := NewLightningChannel(
aliceChannel.signer, notifier, nil, aliceChannels[0],
aliceChannel.signer, nil, aliceChannels[0],
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
bobChannelNew, err := NewLightningChannel(
bobChannel.signer, notifier, nil, bobChannels[0],
bobChannel.signer, nil, bobChannels[0],
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
@ -2742,16 +2564,15 @@ func TestChanSyncFullySynced(t *testing.T) {
if err != nil {
t.Fatalf("unable to fetch channel: %v", err)
}
notifier := aliceChannel.channelEvents
aliceChannelNew, err := NewLightningChannel(
aliceChannel.signer, notifier, nil, aliceChannels[0],
aliceChannel.signer, nil, aliceChannels[0],
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
}
defer aliceChannelNew.Stop()
bobChannelNew, err := NewLightningChannel(
bobChannel.signer, notifier, nil, bobChannels[0],
bobChannel.signer, nil, bobChannels[0],
)
if err != nil {
t.Fatalf("unable to create new channel: %v", err)
@ -2773,9 +2594,8 @@ func restartChannel(channelOld *LightningChannel) (*LightningChannel, error) {
return nil, err
}
notifier := channelOld.channelEvents
channelNew, err := NewLightningChannel(
channelOld.signer, notifier, channelOld.pCache, nodeChannels[0],
channelOld.signer, channelOld.pCache, nodeChannels[0],
)
if err != nil {
return nil, err

View File

@ -6,10 +6,8 @@ import (
"fmt"
"sync"
"github.com/lightningnetwork/lnd/chainntnfs"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/chaincfg"
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
@ -125,36 +123,6 @@ func (m *mockSigner) findKey(needleHash160 []byte, singleTweak []byte,
return nil
}
type mockNotfier struct {
activeSpendNtfn chan *chainntnfs.SpendDetail
}
func (m *mockNotfier) RegisterConfirmationsNtfn(txid *chainhash.Hash,
numConfs, heightHint uint32) (*chainntnfs.ConfirmationEvent, error) {
return nil, nil
}
func (m *mockNotfier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) {
return nil, nil
}
func (m *mockNotfier) Start() error {
return nil
}
func (m *mockNotfier) Stop() error {
return nil
}
func (m *mockNotfier) RegisterSpendNtfn(outpoint *wire.OutPoint,
heightHint uint32) (*chainntnfs.SpendEvent, error) {
spendChan := make(chan *chainntnfs.SpendDetail)
m.activeSpendNtfn = spendChan
return &chainntnfs.SpendEvent{
Spend: spendChan,
Cancel: func() {},
}, nil
}
type mockPreimageCache struct {
sync.Mutex
preimageMap map[[32]byte][]byte

View File

@ -73,10 +73,10 @@ func TestCommitmentSpendValidation(t *testing.T) {
// This is Alice's commitment transaction, so she must wait a CSV delay
// of 5 blocks before sweeping the output, while bob can spend
// immediately with either the revocation key, or his regular key.
keyRing := &commitmentKeyRing{
delayKey: aliceDelayKey,
revocationKey: revokePubKey,
noDelayKey: bobPayKey,
keyRing := &CommitmentKeyRing{
DelayKey: aliceDelayKey,
RevocationKey: revokePubKey,
NoDelayKey: bobPayKey,
}
commitmentTx, err := CreateCommitTx(*fakeFundingTxIn, keyRing, csvTimeout,
channelBalance, channelBalance, DefaultDustLimit())

View File

@ -534,6 +534,8 @@ func (l *LightningWallet) handleFundingReserveRequest(req *initFundingReserveMsg
// for the duration of the channel. The keys include: our multi-sig
// key, the base revocation key, the base htlc key,the base payment
// key, and the delayed payment key.
//
// TODO(roasbeef): special derivaiton?
reservation.ourContribution.MultiSigKey, err = l.NewRawKey()
if err != nil {
req.err <- err
@ -845,7 +847,7 @@ func (l *LightningWallet) handleContributionMsg(req *addContributionMsg) {
// both commitment transactions.
var stateObfuscator [StateHintSize]byte
if chanState.ChanType == channeldb.SingleFunder {
stateObfuscator = deriveStateHintObfuscator(
stateObfuscator = DeriveStateHintObfuscator(
ourContribution.PaymentBasePoint,
theirContribution.PaymentBasePoint,
)
@ -854,12 +856,12 @@ func (l *LightningWallet) handleContributionMsg(req *addContributionMsg) {
theirSer := theirContribution.PaymentBasePoint.SerializeCompressed()
switch bytes.Compare(ourSer, theirSer) {
case -1:
stateObfuscator = deriveStateHintObfuscator(
stateObfuscator = DeriveStateHintObfuscator(
ourContribution.PaymentBasePoint,
theirContribution.PaymentBasePoint,
)
default:
stateObfuscator = deriveStateHintObfuscator(
stateObfuscator = DeriveStateHintObfuscator(
theirContribution.PaymentBasePoint,
ourContribution.PaymentBasePoint,
)
@ -1161,7 +1163,7 @@ func (l *LightningWallet) handleSingleFunderSigs(req *addSingleFunderSigsMsg) {
// With both commitment transactions constructed, we can now use the
// generator state obfuscator to encode the current state number within
// both commitment transactions.
stateObfuscator := deriveStateHintObfuscator(
stateObfuscator := DeriveStateHintObfuscator(
pendingReservation.theirContribution.PaymentBasePoint,
pendingReservation.ourContribution.PaymentBasePoint)
err = initStateHints(ourCommitTx, theirCommitTx, stateObfuscator)
@ -1349,7 +1351,7 @@ func (l *LightningWallet) deriveMasterRevocationRoot() (*btcec.PrivateKey, error
return masterElkremRoot.ECPrivKey()
}
// deriveStateHintObfuscator derives the bytes to be used for obfuscating the
// DeriveStateHintObfuscator derives the bytes to be used for obfuscating the
// state hints from the root to be used for a new channel. The obsfucsator is
// generated via the following computation:
//
@ -1357,7 +1359,7 @@ func (l *LightningWallet) deriveMasterRevocationRoot() (*btcec.PrivateKey, error
// * where both keys are the multi-sig keys of the respective parties
//
// The first 6 bytes of the resulting hash are used as the state hint.
func deriveStateHintObfuscator(key1, key2 *btcec.PublicKey) [StateHintSize]byte {
func DeriveStateHintObfuscator(key1, key2 *btcec.PublicKey) [StateHintSize]byte {
h := sha256.New()
h.Write(key1.SerializeCompressed())
h.Write(key2.SerializeCompressed())