From bfa2a1d698bd2b38bb89afa0cf7867e06beec2b9 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Mon, 12 Sep 2016 12:37:51 -0700 Subject: [PATCH] lnd: introduce the utxoNursery which incubates outputs until maturity MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit introduces the utxoNursery. The duty of the utxoNursery is to watch over CSV-locked immature outputs until they’ve fully matured. An output is mature once both its sequence lock indicated by the CSV op code within its output has become active. Once an output is mature the nursery sweeps the outputs in batches into the source wallet. The utxoNursery executes its duties once a commitment transaction has been broadcast on-chain. --- log.go | 4 + server.go | 8 ++ utxonursery.go | 305 +++++++++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 317 insertions(+) create mode 100644 utxonursery.go diff --git a/log.go b/log.go index 07a56b50..e4c265cf 100644 --- a/log.go +++ b/log.go @@ -26,6 +26,7 @@ var ( ntfnLog = btclog.Disabled chdbLog = btclog.Disabled hswcLog = btclog.Disabled + utxnLog = btclog.Disabled ) // subsystemLoggers maps each subsystem identifier to its associated logger. @@ -39,6 +40,7 @@ var subsystemLoggers = map[string]btclog.Logger{ "CHDB": chdbLog, "FNDG": fndgLog, "HSWC": hswcLog, + "UTXN": utxnLog, } // useLogger updates the logger references for subsystemID to logger. Invalid @@ -79,6 +81,8 @@ func useLogger(subsystemID string, logger btclog.Logger) { case "HSWC": hswcLog = logger + case "UTXN": + utxnLog = logger } } diff --git a/server.go b/server.go index 381a7449..98835f9f 100644 --- a/server.go +++ b/server.go @@ -55,6 +55,8 @@ type server struct { routingMgr *routing.RoutingManager + utxoNursery *utxoNursery + newPeers chan *peer donePeers chan *peer queries chan interface{} @@ -104,6 +106,8 @@ func newServer(listenAddrs []string, notifier chainntnfs.ChainNotifier, // TODO(roasbeef): remove s.invoices.addInvoice(1000*1e8, *debugPre) + s.utxoNursery = newUtxoNursery(notifier, wallet) + // Create a new routing manager with ourself as the sole node within // the graph. s.routingMgr = routing.NewRoutingManager(graph.NewID(s.lightningID), nil) @@ -145,6 +149,9 @@ func (s *server) Start() error { if err := s.htlcSwitch.Start(); err != nil { return err } + if err := s.utxoNursery.Start(); err != nil { + return err + } s.routingMgr.Start() s.wg.Add(1) @@ -175,6 +182,7 @@ func (s *server) Stop() error { s.fundingMgr.Stop() s.routingMgr.Stop() s.htlcSwitch.Stop() + s.utxoNursery.Stop() s.lnwallet.Shutdown() diff --git a/utxonursery.go b/utxonursery.go new file mode 100644 index 00000000..a8f49b5b --- /dev/null +++ b/utxonursery.go @@ -0,0 +1,305 @@ +package main + +import ( + "sync" + + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lnd/chainntnfs" + "github.com/lightningnetwork/lnd/channeldb" + "github.com/lightningnetwork/lnd/lnwallet" + "github.com/roasbeef/btcd/txscript" + "github.com/roasbeef/btcd/wire" + "github.com/roasbeef/btcutil" +) + +// utxoNursery is a system dedicated to incubating time-locked outputs created +// by the broadcast of a commitment transaction either by us, or the remote +// peer. The nursery accepts outputs and "incubates" them until they've reached +// maturity, then sweep the outputs into the source wallet. An output is +// considered mature after the relative time-lock within the pkScript has +// passed. As outputs reach their maturity age, they're sweeped in batches into +// the source wallet, returning the outputs so they can be used within future +// channels, or regular Bitcoin transactions. +type utxoNursery struct { + sync.RWMutex + + notifier chainntnfs.ChainNotifier + wallet *lnwallet.LightningWallet + + db channeldb.DB + + requests chan *incubationRequest + + // TODO(roasbeef): persist to disk afterwards + unstagedOutputs map[wire.OutPoint]*immatureOutput + stagedOutputs map[uint32][]*immatureOutput + + started uint32 + stopped uint32 + quit chan struct{} + wg sync.WaitGroup +} + +// newUtxoNursery creates a new instance of the utxoNursery from a +// ChainNotifier and LightningWallet instance. +func newUtxoNursery(notifier chainntnfs.ChainNotifier, + wallet *lnwallet.LightningWallet) *utxoNursery { + + return &utxoNursery{ + notifier: notifier, + wallet: wallet, + requests: make(chan *incubationRequest), + unstagedOutputs: make(map[wire.OutPoint]*immatureOutput), + stagedOutputs: make(map[uint32][]*immatureOutput), + quit: make(chan struct{}), + } +} + +// Start launches all goroutines the utxoNursery needs to properly carry out +// its duties. +func (u *utxoNursery) Start() error { + u.wg.Add(1) + go u.incubator() + + return nil +} + +// Stop gracefully shutsdown any lingering goroutines launched during normal +// operation of the utxoNursery. +func (u *utxoNursery) Stop() error { + close(u.quit) + u.wg.Wait() + return nil +} + +// incubator is tasked with watching over all immature outputs until they've +// reached "maturity", after which they'll be sweeped into the underlying +// wallet in batches within a single transaction. Immature outputs can be +// divided into three stages: early stage, mid stage, and final stage. During +// the early stage, the transaction containing the output has not yet been +// confirmed. Once the txn creating the output is confirmed, then output moves +// to the mid stage wherein a dedicated goroutine waits until it has reached +// "maturity". Once an output is mature, it will be sweeped into the wallet at +// the earlier possible height. +func (u *utxoNursery) incubator() { + // Register with the notifier to receive notifications for each newly + // connected block. + newBlocks, err := u.notifier.RegisterBlockEpochNtfn() + if err != nil { + utxnLog.Errorf("unable to register for block epoch "+ + "notifications: %v", err) + } + + // Outputs that are transitioning from early to mid-stage are sent over + // this channel by each output's dedicated watcher goroutine. + midStageOutputs := make(chan *immatureOutput) +out: + for { + select { + case earlyStagers := <-u.requests: + utxnLog.Infof("Incubating %v new outputs", + len(earlyStagers.outputs)) + + for _, immatureUtxo := range earlyStagers.outputs { + outpoint := immatureUtxo.outPoint + sourceTXID := outpoint.Hash + + // Register for a confirmation once the + // generating txn has been confirmed. + confChan, err := u.notifier.RegisterConfirmationsNtfn(&sourceTXID, 1) + if err != nil { + utxnLog.Errorf("unable to register for confirmations "+ + "for txid: %v", sourceTXID) + continue + } + + // TODO(roasbeef): should be an on-disk + // at-least-once task queue + u.unstagedOutputs[outpoint] = immatureUtxo + + // Launch a dedicated goroutine which will send + // the output back to the incubator once the + // source txn has been confirmed. + go func() { + confHeight, ok := <-confChan.Confirmed + if !ok { + utxnLog.Errorf("notification chan "+ + "closed, can't advance output %v", outpoint) + } + + utxnLog.Infof("Outpoint %v confirmed in "+ + "block %v moving to mid-stage", + outpoint, confHeight) + immatureUtxo.confHeight = uint32(confHeight) + midStageOutputs <- immatureUtxo + }() + } + case midUtxo := <-midStageOutputs: + // The transaction creating the output has been + // created, so we move it from early stage to + // mid-stage. + delete(u.unstagedOutputs, midUtxo.outPoint) + + // TODO(roasbeef): your off-by-one sense are tingling... + maturityHeight := midUtxo.confHeight + midUtxo.blocksToMaturity + u.stagedOutputs[maturityHeight] = append(u.stagedOutputs[maturityHeight], midUtxo) + + utxnLog.Infof("Outpoint %v now mid-stage, will mature "+ + "at height %v (delay of %v)", midUtxo.outPoint, + maturityHeight, midUtxo.blocksToMaturity) + case epoch := <-newBlocks.Epochs: + // A new block has just been connected, check to see if + // we have any new outputs that can be swept into the + // wallet. + newHeight := uint32(epoch.Height) + matureOutputs, ok := u.stagedOutputs[newHeight] + if !ok { + continue + } + + utxnLog.Infof("New block: height=%v hash=%v, "+ + "sweeping %v mature outputs", newHeight, + epoch.Hash, len(matureOutputs)) + + // Create a transation which sweeps all the newly + // mature outputs into a output controlled by the + // wallet. + // TODO(roasbeef): can be more intelligent about + // buffering outputs to be more efficient on-chain. + sweepTx, err := u.createSweepTx(matureOutputs) + if err != nil { + // TODO(roasbeef): retry logic? + utxnLog.Errorf("unable to create sweep tx: %v", err) + continue + } + + utxnLog.Infof("Sweeping %v time-locked outputs "+ + "with sweep tx: %v", len(matureOutputs), + newLogClosure(func() string { + return spew.Sdump(sweepTx) + })) + + // With the sweep transaction fully signed, broadcast + // the transaction to the network. Additionally, we can + // stop tracking these outputs as they've just been + // sweeped. + err = u.wallet.PublishTransaction(sweepTx) + if err != nil { + utxnLog.Errorf("unable to broadcast sweep tx: %v, %v", + err, spew.Sdump(sweepTx)) + continue + } + delete(u.stagedOutputs, newHeight) + case <-u.quit: + break out + } + } + + u.wg.Done() +} + +// createSweepTx creates a final sweeping transaction with all witnesses +// inplace for all inputs. The created transaction has a single output sending +// all the funds back to the source wallet. +func (u *utxoNursery) createSweepTx(matureOutputs []*immatureOutput) (*wire.MsgTx, error) { + sweepAddr, err := u.wallet.NewAddress(lnwallet.WitnessPubKey, false) + if err != nil { + return nil, err + } + pkScript, err := txscript.PayToAddrScript(sweepAddr) + if err != nil { + return nil, err + } + + var totalSum btcutil.Amount + for _, o := range matureOutputs { + totalSum += o.amt + } + + sweepTx := wire.NewMsgTx() + sweepTx.Version = 2 + sweepTx.AddTxOut(&wire.TxOut{ + PkScript: pkScript, + Value: int64(totalSum - 1000), + }) + for _, utxo := range matureOutputs { + sweepTx.AddTxIn(&wire.TxIn{ + PreviousOutPoint: utxo.outPoint, + // TODO(roasbeef): assumes pure block delays + Sequence: utxo.blocksToMaturity, + }) + } + + // TODO(roasbeef): insert fee calculation + // * remove hardcoded fee above + + // With all the inputs in place, use each output's unique witness + // function to generate the final witness required for spending. + hashCache := txscript.NewTxSigHashes(sweepTx) + for i, txIn := range sweepTx.TxIn { + witness, err := matureOutputs[i].witnessFunc(sweepTx, hashCache, i) + if err != nil { + return nil, err + } + + txIn.Witness = witness + } + + return sweepTx, nil +} + +// witnessFunc represents a function which is able to generate the final +// witness for a particular public key script. This function acts as an +// abstraction layer, hidiing the details of the underlying script from the +// utxoNursery. +type witnessGenerator func(tx *wire.MsgTx, hc *txscript.TxSigHashes, inputIndex int) ([][]byte, error) + +// immatureOutput encapsulates an immature output. The struct includes a +// witnessGenerator closure which will be used to generate the witness required +// to sweep the output once it's mature. +// TODO(roasbeef): make into interface? can't gob functions +type immatureOutput struct { + amt btcutil.Amount + outPoint wire.OutPoint + + witnessFunc witnessGenerator + + // TODO(roasbeef): using block timeouts everywhere currently, will need + // to modify logic later to account for MTP based timeouts. + blocksToMaturity uint32 + confHeight uint32 +} + +// incubationRequest is a request to the utxoNursery to incubate a set of +// outputs until their mature, finally sweeping them into the wallet once +// available. +type incubationRequest struct { + outputs []*immatureOutput +} + +// incubateOutputs sends a request to utxoNursery to incubate the outputs +// defined within the summary of a closed channel. Induvidually, as all outputs +// reach maturity they'll be sweeped back into the wallet. +func (u *utxoNursery) incubateOutputs(closeSummary *lnwallet.ForceCloseSummary) { + // TODO(roasbeef): should use factory func here based on an interface + // * spend here also assumes delay is blocked bsaed, and in range + witnessFunc := func(tx *wire.MsgTx, hc *txscript.TxSigHashes, inputIndex int) ([][]byte, error) { + desc := closeSummary.SelfOutputSignDesc + desc.SigHashes = hc + desc.InputIndex = inputIndex + + return lnwallet.CommitSpendTimeout(u.wallet.Signer, desc, tx) + } + + outputAmt := btcutil.Amount(closeSummary.SelfOutputSignDesc.Output.Value) + selfOutput := &immatureOutput{ + amt: outputAmt, + outPoint: closeSummary.SelfOutpoint, + witnessFunc: witnessFunc, + blocksToMaturity: closeSummary.SelfOutputMaturity, + } + + u.requests <- &incubationRequest{ + outputs: []*immatureOutput{selfOutput}, + } +}