utxonursery: added persistence to transaction output states

Moved transaction states from in-memory maps to persistent BoltDB
buckets. This allows channel force closes to operate reliably if the
daemon is shut down and restarted at any point during the forced
channel closure process.
This commit is contained in:
bryanvu 2016-12-13 15:32:44 -08:00 committed by Olaoluwa Osuntokun
parent 90ed23e6aa
commit aa04f82a15
6 changed files with 1062 additions and 200 deletions

View File

@ -21,6 +21,11 @@ const (
notifierType = "btcd"
)
var (
ErrChainNotifierShuttingDown = errors.New("chainntnfs: system interrupt " +
"while attempting to register for spend notification.")
)
// chainUpdate encapsulates an update to the current main chain. This struct is
// used as an element within an unbounded queue in order to avoid blocking the
// main rpc dispatch rule.
@ -549,8 +554,7 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.S
select {
case <-b.quit:
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for spend notification.")
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn:
}
@ -611,8 +615,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash,
select {
case <-b.quit:
return nil, errors.New("chainntnfs: system interrupt while " +
"attempting to register for confirmation notification.")
return nil, ErrChainNotifierShuttingDown
case b.notificationRegistry <- ntfn:
return &chainntnfs.ConfirmationEvent{
Confirmed: ntfn.finConf,

View File

@ -172,7 +172,7 @@ func openChannelAndAssert(t *harnessTest, net *networkHarness, ctx context.Conte
func closeChannelAndAssert(t *harnessTest, net *networkHarness, ctx context.Context,
node *lightningNode, fundingChanPoint *lnrpc.ChannelPoint, force bool) *wire.ShaHash {
closeUpdates, err := net.CloseChannel(ctx, node, fundingChanPoint, force)
closeUpdates, _, err := net.CloseChannel(ctx, node, fundingChanPoint, force)
if err != nil {
t.Fatalf("unable to close channel: %v", err)
}
@ -275,11 +275,13 @@ func testChannelBalance(net *networkHarness, t *harnessTest) {
// force closes the channel after some cursory assertions. Within the test, two
// transactions should be broadcast on-chain, the commitment transaction itself
// (which closes the channel), and the sweep transaction a few blocks later
// once the output(s) become mature.
// once the output(s) become mature. This test also includes several restarts
// to ensure that the transaction output states are persisted throughout
// the forced closure process.
//
// TODO(roabeef): also add an unsettled HTLC before force closing.
// TODO(roasbeef): also add an unsettled HTLC before force closing.
func testChannelForceClosure(net *networkHarness, t *harnessTest) {
timeout := time.Duration(time.Second * 5)
timeout := time.Duration(time.Second * 10)
ctxb := context.Background()
// First establish a channel ween with a capacity of 100k satoshis
@ -306,28 +308,63 @@ func testChannelForceClosure(net *networkHarness, t *harnessTest) {
// the channel. This will also assert that the commitment transaction
// was immediately broadcast in order to fulfill the force closure
// request.
closeUpdate, err := net.CloseChannel(ctxb, net.Alice, chanPoint, true)
_, closingTxID, err := net.CloseChannel(ctxb, net.Alice, chanPoint, true)
if err != nil {
t.Fatalf("unable to execute force channel closure: %v", err)
}
// The several restarts in this test are intended to ensure that when a
// channel is force-closed, the UTXO nursery has persisted the state of
// the channel in the closure process and will recover the correct state
// when the system comes back on line. This restart tests state
// persistence at the beginning of the process, when the commitment
// transaction has been broadcast but not yet confirmed in a block.
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
// Mine a block which should confirm the commitment transaction
// broadcast as a result of the force closure.
if _, err := net.Miner.Node.Generate(1); err != nil {
t.Fatalf("unable to generate block: %v", err)
}
ctxt, _ = context.WithTimeout(ctxb, timeout)
closingTxID, err := net.WaitForChannelClose(ctxt, closeUpdate)
if err != nil {
t.Fatalf("error while waiting for channel close: %v", err)
// The following sleep provides time for the UTXO nursery to move the
// output from the preschool to the kindergarten database buckets
// prior to RestartNode() being triggered. Without this sleep, the
// database update may fail, causing the UTXO nursery to retry the move
// operation upon restart. This will change the blockheights from what
// is expected by the test.
// TODO(bvu): refactor out this sleep.
duration := time.Millisecond * 300
time.Sleep(duration)
// The following restart is intended to ensure that outputs from the
// force close commitment transaction have been persisted once the
// transaction has been confirmed, but before the outputs are spendable
// (the "kindergarten" bucket.)
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
// Currently within the codebase, the default CSV is 4 relative blocks.
// So generate exactly 4 new blocks.
// For the persistence test, we generate three blocks, then trigger
// a restart and then generate the final block that should trigger
// the creation of the sweep transaction.
// TODO(roasbeef): should check default value in config here instead,
// or make delay a param
const defaultCSV = 4
if _, err := net.Miner.Node.Generate(defaultCSV); err != nil {
if _, err := net.Miner.Node.Generate(defaultCSV - 1); err != nil {
t.Fatalf("unable to mine blocks: %v", err)
}
// The following restart checks to ensure that outputs in the kindergarten
// bucket are persisted while waiting for the required number of
// confirmations to be reported.
if err := net.RestartNode(net.Alice, nil); err != nil {
t.Fatalf("Node restart failed: %v", err)
}
if _, err := net.Miner.Node.Generate(1); err != nil {
t.Fatalf("unable to mine blocks: %v", err)
}
@ -336,20 +373,21 @@ func testChannelForceClosure(net *networkHarness, t *harnessTest) {
// broadcast.
var sweepingTXID *wire.ShaHash
var mempool []*wire.ShaHash
mempoolTimeout := time.After(3 * time.Second)
checkMempoolTick := time.Tick(100 * time.Millisecond)
mempoolPoll:
for {
select {
case <-time.After(time.Second * 5):
case <-mempoolTimeout:
t.Fatalf("sweep tx not found in mempool")
default:
case <-checkMempoolTick:
mempool, err = net.Miner.Node.GetRawMempool()
if err != nil {
t.Fatalf("unable to fetch node's mempool: %v", err)
}
if len(mempool) == 0 {
continue
if len(mempool) != 0 {
break mempoolPoll
}
break mempoolPoll
}
}

View File

@ -734,7 +734,7 @@ func (n *networkHarness) WaitForChannelOpen(ctx context.Context,
// pending, then an error is returned.
func (n *networkHarness) CloseChannel(ctx context.Context,
lnNode *lightningNode, cp *lnrpc.ChannelPoint,
force bool) (lnrpc.Lightning_CloseChannelClient, error) {
force bool) (lnrpc.Lightning_CloseChannelClient, *wire.ShaHash, error) {
closeReq := &lnrpc.CloseChannelRequest{
ChannelPoint: cp,
@ -742,11 +742,11 @@ func (n *networkHarness) CloseChannel(ctx context.Context,
}
closeRespStream, err := lnNode.CloseChannel(ctx, closeReq)
if err != nil {
return nil, fmt.Errorf("unable to close channel: %v", err)
return nil, nil, fmt.Errorf("unable to close channel: %v", err)
}
errChan := make(chan error)
fin := make(chan struct{})
fin := make(chan *wire.ShaHash)
go func() {
// Consume the "channel close" update in order to wait for the closing
// transaction to be broadcast, then wait for the closing tx to be seen
@ -772,20 +772,19 @@ func (n *networkHarness) CloseChannel(ctx context.Context,
errChan <- err
return
}
close(fin)
fin <- closeTxid
}()
// Wait until either the deadline for the context expires, an error
// occurs, or the channel close update is received.
select {
case <-ctx.Done():
return nil, fmt.Errorf("timeout reached before channel close " +
return nil, nil, fmt.Errorf("timeout reached before channel close " +
"initiated")
case err := <-errChan:
return nil, err
case <-fin:
return closeRespStream, nil
return nil, nil, err
case closeTxid := <-fin:
return closeRespStream, closeTxid, nil
}
}

View File

@ -108,7 +108,7 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier,
chanDB: chanDB,
invoices: newInvoiceRegistry(chanDB),
utxoNursery: newUtxoNursery(notifier, wallet),
utxoNursery: newUtxoNursery(chanDB, notifier, wallet),
htlcSwitch: newHtlcSwitch(),
identityPriv: privKey,

File diff suppressed because it is too large Load Diff

244
utxonursery_test.go Normal file
View File

@ -0,0 +1,244 @@
package main
import (
"bytes"
"fmt"
"reflect"
"testing"
"github.com/lightningnetwork/lnd/lnwallet"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
var (
outPoints = []wire.OutPoint{
wire.OutPoint{
Hash: [wire.HashSize]byte{
0x51, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
0x48, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17,
0x2d, 0xe7, 0x93, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1f, 0xb, 0x4c, 0xf9, 0x9e, 0xc5, 0x8c, 0xe9,
},
Index: 9,
},
wire.OutPoint{
Hash: [wire.HashSize]byte{
0xb7, 0x94, 0x38, 0x5f, 0x2d, 0x1e, 0xf7, 0xab,
0x4d, 0x92, 0x73, 0xd1, 0x90, 0x63, 0x81, 0xb4,
0x4f, 0x2f, 0x6f, 0x25, 0x88, 0xa3, 0xef, 0xb9,
0x6a, 0x49, 0x18, 0x83, 0x31, 0x98, 0x47, 0x53,
},
Index: 49,
},
wire.OutPoint{
Hash: [wire.HashSize]byte{
0x81, 0xb6, 0x37, 0xd8, 0xfc, 0xd2, 0xc6, 0xda,
0x63, 0x59, 0xe6, 0x96, 0x31, 0x13, 0xa1, 0x17,
0xd, 0xe7, 0x95, 0xe4, 0xb7, 0x25, 0xb8, 0x4d,
0x1e, 0xb, 0x4c, 0xfd, 0x9e, 0xc5, 0x8c, 0xe9,
},
Index: 23,
},
}
keys = [][]byte{
[]byte{0x04, 0x11, 0xdb, 0x93, 0xe1, 0xdc, 0xdb, 0x8a,
0x01, 0x6b, 0x49, 0x84, 0x0f, 0x8c, 0x53, 0xbc, 0x1e,
0xb6, 0x8a, 0x38, 0x2e, 0x97, 0xb1, 0x48, 0x2e, 0xca,
0xd7, 0xb1, 0x48, 0xa6, 0x90, 0x9a, 0x5c, 0xb2, 0xe0,
0xea, 0xdd, 0xfb, 0x84, 0xcc, 0xf9, 0x74, 0x44, 0x64,
0xf8, 0x2e, 0x16, 0x0b, 0xfa, 0x9b, 0x8b, 0x64, 0xf9,
0xd4, 0xc0, 0x3f, 0x99, 0x9b, 0x86, 0x43, 0xf6, 0x56,
0xb4, 0x12, 0xa3,
},
[]byte{0x07, 0x11, 0xdb, 0x93, 0xe1, 0xdc, 0xdb, 0x8a,
0x01, 0x6b, 0x49, 0x84, 0x0f, 0x8c, 0x53, 0xbc, 0x1e,
0xb6, 0x8a, 0x38, 0x2e, 0x97, 0xb1, 0x48, 0x2e, 0xca,
0xd7, 0xb1, 0x48, 0xa6, 0x90, 0x9a, 0x5c, 0xb2, 0xe0,
0xea, 0xdd, 0xfb, 0x84, 0xcc, 0xf9, 0x74, 0x44, 0x64,
0xf8, 0x2e, 0x16, 0x0b, 0xfa, 0x9b, 0x8b, 0x64, 0xf9,
0xd4, 0xc0, 0x3f, 0x99, 0x9b, 0x86, 0x43, 0xf6, 0x56,
0xb4, 0x12, 0xa3,
},
[]byte{0x02, 0xce, 0x0b, 0x14, 0xfb, 0x84, 0x2b, 0x1b,
0xa5, 0x49, 0xfd, 0xd6, 0x75, 0xc9, 0x80, 0x75, 0xf1,
0x2e, 0x9c, 0x51, 0x0f, 0x8e, 0xf5, 0x2b, 0xd0, 0x21,
0xa9, 0xa1, 0xf4, 0x80, 0x9d, 0x3b, 0x4d,
},
}
signDescriptors = []lnwallet.SignDescriptor{
lnwallet.SignDescriptor{
PrivateTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02,
},
WitnessScript: []byte{
0x00, 0x14, 0xee, 0x91, 0x41, 0x7e, 0x85, 0x6c, 0xde,
0x10, 0xa2, 0x91, 0x1e, 0xdc, 0xbd, 0xbd, 0x69, 0xe2,
0xef, 0xb5, 0x71, 0x48,
},
Output: &wire.TxOut{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5,
0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42,
0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24,
0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97,
0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20,
0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
HashType: txscript.SigHashAll,
},
lnwallet.SignDescriptor{
PrivateTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02,
},
WitnessScript: []byte{
0x00, 0x14, 0xee, 0x91, 0x41, 0x7e, 0x85, 0x6c, 0xde,
0x10, 0xa2, 0x91, 0x1e, 0xdc, 0xbd, 0xbd, 0x69, 0xe2,
0xef, 0xb5, 0x71, 0x48,
},
Output: &wire.TxOut{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5,
0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42,
0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24,
0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97,
0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20,
0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
HashType: txscript.SigHashAll,
},
lnwallet.SignDescriptor{
PrivateTweak: []byte{
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02, 0x02,
0x02, 0x02, 0x02, 0x02, 0x02,
},
WitnessScript: []byte{
0x00, 0x14, 0xee, 0x91, 0x41, 0x7e, 0x85, 0x6c, 0xde,
0x10, 0xa2, 0x91, 0x1e, 0xdc, 0xbd, 0xbd, 0x69, 0xe2,
0xef, 0xb5, 0x71, 0x48,
},
Output: &wire.TxOut{
Value: 5000000000,
PkScript: []byte{
0x41, // OP_DATA_65
0x04, 0xd6, 0x4b, 0xdf, 0xd0, 0x9e, 0xb1, 0xc5,
0xfe, 0x29, 0x5a, 0xbd, 0xeb, 0x1d, 0xca, 0x42,
0x81, 0xbe, 0x98, 0x8e, 0x2d, 0xa0, 0xb6, 0xc1,
0xc6, 0xa5, 0x9d, 0xc2, 0x26, 0xc2, 0x86, 0x24,
0xe1, 0x81, 0x75, 0xe8, 0x51, 0xc9, 0x6b, 0x97,
0x3d, 0x81, 0xb0, 0x1c, 0xc3, 0x1f, 0x04, 0x78,
0x34, 0xbc, 0x06, 0xd6, 0xd6, 0xed, 0xf6, 0x20,
0xd1, 0x84, 0x24, 0x1a, 0x6a, 0xed, 0x8b, 0x63,
0xa6, // 65-byte signature
0xac, // OP_CHECKSIG
},
},
HashType: txscript.SigHashAll,
},
}
kidOutputs = []kidOutput{
kidOutput{
amt: btcutil.Amount(13e7),
outPoint: outPoints[0],
blocksToMaturity: uint32(100),
confHeight: uint32(1770001),
},
kidOutput{
amt: btcutil.Amount(24e7),
outPoint: outPoints[1],
blocksToMaturity: uint32(50),
confHeight: uint32(22342321),
},
kidOutput{
amt: btcutil.Amount(2e5),
outPoint: outPoints[2],
blocksToMaturity: uint32(12),
confHeight: uint32(34241),
},
}
)
func TestAddSerializedKidsToList(t *testing.T) {
var b bytes.Buffer
for i := 0; i < 3; i++ {
kid := &kidOutputs[i]
descriptor := &signDescriptors[i]
pk, err := btcec.ParsePubKey(keys[i], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pub key: %v", keys[i])
}
descriptor.PubKey = pk
kid.signDescriptor = descriptor
if err := serializeKidOutput(&b, &kidOutputs[i]); err != nil {
t.Fatalf("unable to serialize and add kid output to list: %v", err)
}
}
kidList, err := deserializeKidList(&b)
if err != nil {
t.Fatalf("unable to deserialize kid output list: ", err)
}
for i := 0; i < 3; i++ {
if !reflect.DeepEqual(&kidOutputs[i], kidList[i]) {
t.Fatalf("kidOutputs don't match \n%+v\n%+v", &kidOutputs[i], kidList[i])
}
}
}
func TestSerializeKidOutput(t *testing.T) {
kid := &kidOutputs[0]
descriptor := &signDescriptors[0]
pk, err := btcec.ParsePubKey(keys[0], btcec.S256())
if err != nil {
t.Fatalf("unable to parse pub key: %v", keys[0])
}
descriptor.PubKey = pk
kid.signDescriptor = descriptor
var b bytes.Buffer
if err := serializeKidOutput(&b, kid); err != nil {
t.Fatalf("unable to serialize kid output: %v", err)
}
deserializedKid, err := deserializeKidOutput(&b)
if err != nil {
fmt.Printf(err.Error())
}
if !reflect.DeepEqual(kid, deserializedKid) {
t.Fatalf("kidOutputs don't match %+v vs %+v", kid, deserializedKid)
}
}