chainntfs: use btcwallet's new NotificationServer for BtcdNotifier

* Rather than rely on the legacy notification interface which may be
deprecated in the near future, we’ll now switch to using the
NotificationSever.

* Negative confirmation notifications due to re-orgs are still
unimplemented however.
This commit is contained in:
Olaoluwa Osuntokun 2016-02-24 22:16:25 -08:00
parent 9fb8045bd2
commit 3d478cdd3e
1 changed files with 126 additions and 101 deletions

View File

@ -1,6 +1,7 @@
package btcdnotify package btcdnotify
import ( import (
"bytes"
"container/heap" "container/heap"
"fmt" "fmt"
"sync" "sync"
@ -8,6 +9,7 @@ import (
"github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcd/wire"
"github.com/btcsuite/btcwallet/chain" "github.com/btcsuite/btcwallet/chain"
btcwallet "github.com/btcsuite/btcwallet/wallet"
"github.com/btcsuite/btcwallet/wtxmgr" "github.com/btcsuite/btcwallet/wtxmgr"
"github.com/lightningnetwork/lnd/chainntfs" "github.com/lightningnetwork/lnd/chainntfs"
) )
@ -17,8 +19,8 @@ type BtcdNotifier struct {
started int32 // To be used atomically started int32 // To be used atomically
stopped int32 // To be used atomically stopped int32 // To be used atomically
// TODO(roasbeef): refactor to use the new NotificationServer ntfnSource *btcwallet.NotificationServer
conn ChainConnection chainConn *chain.RPCClient
notificationRegistry chan interface{} notificationRegistry chan interface{}
@ -32,8 +34,6 @@ type BtcdNotifier struct {
disconnectedBlocks <-chan wtxmgr.BlockMeta disconnectedBlocks <-chan wtxmgr.BlockMeta
relevantTxs <-chan chain.RelevantTx relevantTxs <-chan chain.RelevantTx
rpcConnected chan struct{}
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
} }
@ -41,10 +41,16 @@ type BtcdNotifier struct {
var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil)
// NewBtcdNotifier... // NewBtcdNotifier...
func NewBtcdNotifier(c ChainConnection) (*BtcdNotifier, error) { // TODO(roasbeef): chain client + notification sever
// TODO(roasbeef): take client also in order to get notifications? // * use server for notifications
// * when asked for spent, request via client
func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer,
chainConn *chain.RPCClient) (*BtcdNotifier, error) {
return &BtcdNotifier{ return &BtcdNotifier{
conn: c, ntfnSource: ntfnSource,
chainConn: chainConn,
notificationRegistry: make(chan interface{}), notificationRegistry: make(chan interface{}),
spendNotifications: make(map[wire.OutPoint]*spendNotification), spendNotifications: make(map[wire.OutPoint]*spendNotification),
@ -55,8 +61,6 @@ func NewBtcdNotifier(c ChainConnection) (*BtcdNotifier, error) {
disconnectedBlocks: make(chan wtxmgr.BlockMeta), disconnectedBlocks: make(chan wtxmgr.BlockMeta),
relevantTxs: make(chan chain.RelevantTx), relevantTxs: make(chan chain.RelevantTx),
rpcConnected: make(chan struct{}, 1),
quit: make(chan struct{}), quit: make(chan struct{}),
}, nil }, nil
} }
@ -70,7 +74,6 @@ func (b *BtcdNotifier) Start() error {
b.wg.Add(1) b.wg.Add(1)
go b.notificationDispatcher() go b.notificationDispatcher()
b.rpcConnected <- struct{}{} // TODO(roasbeef) why?
return nil return nil
} }
@ -90,12 +93,11 @@ func (b *BtcdNotifier) Stop() error {
// notificationDispatcher... // notificationDispatcher...
func (b *BtcdNotifier) notificationDispatcher() { func (b *BtcdNotifier) notificationDispatcher() {
ntfnClient := b.ntfnSource.TransactionNotifications()
out: out:
for { for {
select { select {
case <-b.rpcConnected:
err := b.initAllNotifications()
fmt.Println(err)
case registerMsg := <-b.notificationRegistry: case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) { switch msg := registerMsg.(type) {
case *spendNotification: case *spendNotification:
@ -103,114 +105,137 @@ out:
case *confirmationsNotification: case *confirmationsNotification:
b.confNotifications[*msg.txid] = msg b.confNotifications[*msg.txid] = msg
} }
case txNtfn := <-b.relevantTxs: case txNtfn := <-ntfnClient.C:
tx := txNtfn.TxRecord.MsgTx // We're only concerned with newly mined blocks which
txMined := txNtfn.Block != nil // may or may not include transactions we are interested
// in.
// First, check if this transaction spends an output if txNtfn.AttachedBlocks == nil {
// that has an existing spend notification for it.
for i, txIn := range tx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered notification
// for, then create a spend summary, finally
// sending off the details to the notification
// subscriber.
if ntfn, ok := b.spendNotifications[prevOut]; ok {
spenderSha := tx.TxSha()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: ntfn.targetOutpoint,
SpenderTxHash: &spenderSha,
// TODO(roasbeef): copy tx?
SpendingTx: &tx,
SpenderInputIndex: uint32(i),
}
ntfn.spendChan <- spendDetails
delete(b.spendNotifications, prevOut)
}
}
// If the transaction has been mined, then we check if
// a notification for the confirmation of this txid has
// been registered previously. Otherwise, we're done,
// for now.
if !txMined {
break break
} }
// If a confirmation notification has been registered newBlocks := txNtfn.AttachedBlocks
// for this txid, then either trigger a notification for _, block := range newBlocks {
// event if only a single confirmation notification was blockHeight := uint32(block.Height)
// requested, or place the notification on the
// confirmation heap for future usage. // Examine all transactions within the block
if confNtfn, ok := b.confNotifications[tx.TxSha()]; ok { // in order to determine if this block includes a
if confNtfn.numConfirmations == 1 { // transactions spending one of the registered
confNtfn.finConf <- struct{}{} // outpoints of interest.
break for _, txSummary := range block.Transactions {
txBytes := bytes.NewReader(txSummary.Transaction)
tx := wire.NewMsgTx()
if err := tx.Deserialize(txBytes); err != nil {
// TODO(roasbeef): err
fmt.Println("unable to des tx: ", err)
continue
}
// Check if the inclusion of this transaction
// within a block by itself triggers a block
// confirmation threshold, if so send a
// notification. Otherwise, place the notification
// on a heap to be triggered in the future once
// additional confirmations are attained.
txSha := tx.TxSha()
b.checkConfirmationTrigger(&txSha, blockHeight)
// Next, examine all the inputs spent, firing
// of a notification if it spends any of the
// outpoints within the set of our registered
// outputs.
b.checkSpendTrigger(tx)
} }
// The registered notification requires more // A new block has been connected to the main
// than one confirmation before triggering. So // chain. Send out any N confirmation notifications
// we create a heapConf entry for this notification. // which may have been triggered by this new block.
// The heapConf allows us to easily keep track of b.notifyConfs(blockHeight)
// which notification(s) we should fire off with
// each incoming block.
confNtfn.initialConfirmHeight = uint32(txNtfn.Block.Height)
heapEntry := &confEntry{
confNtfn,
confNtfn.initialConfirmHeight + confNtfn.numConfirmations,
}
heap.Push(b.confHeap, heapEntry)
}
case blockNtfn := <-b.connectedBlocks:
blockHeight := uint32(blockNtfn.Height)
// Traverse our confirmation heap. The heap is a
// min-heap, so the confirmation notification which requires
// the smallest block-height will always be at the top
// of the heap. If a confirmation notification is eligible
// for triggering, then fire it off, and check if another
// is eligible until there are no more eligible entries.
nextConf := heap.Pop(b.confHeap).(*confEntry)
for nextConf.triggerHeight <= blockHeight {
nextConf.finConf <- struct{}{}
nextConf = heap.Pop(b.confHeap).(*confEntry)
} }
heap.Push(b.confHeap, nextConf)
case delBlockNtfn := <-b.disconnectedBlocks:
// TODO(roasbeef): re-orgs // TODO(roasbeef): re-orgs
// * second channel to notify of confirmation decrementing // * second channel to notify of confirmation decrementing
// re-org? // re-org?
// * notify of negative confirmations // * notify of negative confirmations
fmt.Println(delBlockNtfn) fmt.Println(txNtfn.DetachedBlocks)
case <-b.quit: case <-b.quit:
break out break out
} }
} }
} }
// initAllNotifications... // notifyConfs...
func (b *BtcdNotifier) initAllNotifications() error { func (b *BtcdNotifier) notifyConfs(newBlockHeight uint32) {
var err error // Traverse our confirmation heap. The heap is a
// min-heap, so the confirmation notification which requires
// the smallest block-height will always be at the top
// of the heap. If a confirmation notification is eligible
// for triggering, then fire it off, and check if another
// is eligible until there are no more eligible entries.
nextConf := heap.Pop(b.confHeap).(*confEntry)
for nextConf.triggerHeight <= newBlockHeight {
nextConf.finConf <- struct{}{}
b.connectedBlocks, err = b.conn.ListenConnectedBlocks() nextConf = heap.Pop(b.confHeap).(*confEntry)
if err != nil {
return err
}
b.disconnectedBlocks, err = b.conn.ListenDisconnectedBlocks()
if err != nil {
return err
}
b.relevantTxs, err = b.conn.ListenRelevantTxs()
if err != nil {
return err
} }
return nil heap.Push(b.confHeap, nextConf)
}
// checkSpendTrigger...
func (b *BtcdNotifier) checkSpendTrigger(tx *wire.MsgTx) {
// First, check if this transaction spends an output
// that has an existing spend notification for it.
for i, txIn := range tx.TxIn {
prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an
// output which we have a registered notification
// for, then create a spend summary, finally
// sending off the details to the notification
// subscriber.
if ntfn, ok := b.spendNotifications[prevOut]; ok {
spenderSha := tx.TxSha()
spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: ntfn.targetOutpoint,
SpenderTxHash: &spenderSha,
// TODO(roasbeef): copy tx?
SpendingTx: tx,
SpenderInputIndex: uint32(i),
}
ntfn.spendChan <- spendDetails
delete(b.spendNotifications, prevOut)
}
}
}
// checkConfirmationTrigger...
func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash,
blockHeight uint32) {
// If a confirmation notification has been registered
// for this txid, then either trigger a notification
// event if only a single confirmation notification was
// requested, or place the notification on the
// confirmation heap for future usage.
if confNtfn, ok := b.confNotifications[*txSha]; ok {
if confNtfn.numConfirmations == 1 {
confNtfn.finConf <- struct{}{}
return
}
// The registered notification requires more
// than one confirmation before triggering. So
// we create a heapConf entry for this notification.
// The heapConf allows us to easily keep track of
// which notification(s) we should fire off with
// each incoming block.
confNtfn.initialConfirmHeight = blockHeight
heapEntry := &confEntry{
confNtfn,
confNtfn.initialConfirmHeight + confNtfn.numConfirmations,
}
heap.Push(b.confHeap, heapEntry)
}
} }
// spendNotification.... // spendNotification....