diff --git a/chainntfs/btcdnotify/btcd.go b/chainntfs/btcdnotify/btcd.go index eb6e53f5..1da5a21d 100644 --- a/chainntfs/btcdnotify/btcd.go +++ b/chainntfs/btcdnotify/btcd.go @@ -1,6 +1,7 @@ package btcdnotify import ( + "bytes" "container/heap" "fmt" "sync" @@ -8,6 +9,7 @@ import ( "github.com/btcsuite/btcd/wire" "github.com/btcsuite/btcwallet/chain" + btcwallet "github.com/btcsuite/btcwallet/wallet" "github.com/btcsuite/btcwallet/wtxmgr" "github.com/lightningnetwork/lnd/chainntfs" ) @@ -17,8 +19,8 @@ type BtcdNotifier struct { started int32 // To be used atomically stopped int32 // To be used atomically - // TODO(roasbeef): refactor to use the new NotificationServer - conn ChainConnection + ntfnSource *btcwallet.NotificationServer + chainConn *chain.RPCClient notificationRegistry chan interface{} @@ -32,8 +34,6 @@ type BtcdNotifier struct { disconnectedBlocks <-chan wtxmgr.BlockMeta relevantTxs <-chan chain.RelevantTx - rpcConnected chan struct{} - wg sync.WaitGroup quit chan struct{} } @@ -41,10 +41,16 @@ type BtcdNotifier struct { var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // NewBtcdNotifier... -func NewBtcdNotifier(c ChainConnection) (*BtcdNotifier, error) { - // TODO(roasbeef): take client also in order to get notifications? +// TODO(roasbeef): chain client + notification sever +// * use server for notifications +// * when asked for spent, request via client +func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer, + chainConn *chain.RPCClient) (*BtcdNotifier, error) { + return &BtcdNotifier{ - conn: c, + ntfnSource: ntfnSource, + chainConn: chainConn, + notificationRegistry: make(chan interface{}), spendNotifications: make(map[wire.OutPoint]*spendNotification), @@ -55,8 +61,6 @@ func NewBtcdNotifier(c ChainConnection) (*BtcdNotifier, error) { disconnectedBlocks: make(chan wtxmgr.BlockMeta), relevantTxs: make(chan chain.RelevantTx), - rpcConnected: make(chan struct{}, 1), - quit: make(chan struct{}), }, nil } @@ -70,7 +74,6 @@ func (b *BtcdNotifier) Start() error { b.wg.Add(1) go b.notificationDispatcher() - b.rpcConnected <- struct{}{} // TODO(roasbeef) why? return nil } @@ -90,12 +93,11 @@ func (b *BtcdNotifier) Stop() error { // notificationDispatcher... func (b *BtcdNotifier) notificationDispatcher() { + ntfnClient := b.ntfnSource.TransactionNotifications() + out: for { select { - case <-b.rpcConnected: - err := b.initAllNotifications() - fmt.Println(err) case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { case *spendNotification: @@ -103,114 +105,137 @@ out: case *confirmationsNotification: b.confNotifications[*msg.txid] = msg } - case txNtfn := <-b.relevantTxs: - tx := txNtfn.TxRecord.MsgTx - txMined := txNtfn.Block != nil - - // First, check if this transaction spends an output - // that has an existing spend notification for it. - for i, txIn := range tx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does spend an - // output which we have a registered notification - // for, then create a spend summary, finally - // sending off the details to the notification - // subscriber. - if ntfn, ok := b.spendNotifications[prevOut]; ok { - spenderSha := tx.TxSha() - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: ntfn.targetOutpoint, - SpenderTxHash: &spenderSha, - // TODO(roasbeef): copy tx? - SpendingTx: &tx, - SpenderInputIndex: uint32(i), - } - - ntfn.spendChan <- spendDetails - delete(b.spendNotifications, prevOut) - } - } - - // If the transaction has been mined, then we check if - // a notification for the confirmation of this txid has - // been registered previously. Otherwise, we're done, - // for now. - if !txMined { + case txNtfn := <-ntfnClient.C: + // We're only concerned with newly mined blocks which + // may or may not include transactions we are interested + // in. + if txNtfn.AttachedBlocks == nil { break } - // If a confirmation notification has been registered - // for this txid, then either trigger a notification - // event if only a single confirmation notification was - // requested, or place the notification on the - // confirmation heap for future usage. - if confNtfn, ok := b.confNotifications[tx.TxSha()]; ok { - if confNtfn.numConfirmations == 1 { - confNtfn.finConf <- struct{}{} - break + newBlocks := txNtfn.AttachedBlocks + for _, block := range newBlocks { + blockHeight := uint32(block.Height) + + // Examine all transactions within the block + // in order to determine if this block includes a + // transactions spending one of the registered + // outpoints of interest. + for _, txSummary := range block.Transactions { + txBytes := bytes.NewReader(txSummary.Transaction) + tx := wire.NewMsgTx() + if err := tx.Deserialize(txBytes); err != nil { + // TODO(roasbeef): err + fmt.Println("unable to des tx: ", err) + continue + } + + // Check if the inclusion of this transaction + // within a block by itself triggers a block + // confirmation threshold, if so send a + // notification. Otherwise, place the notification + // on a heap to be triggered in the future once + // additional confirmations are attained. + txSha := tx.TxSha() + b.checkConfirmationTrigger(&txSha, blockHeight) + + // Next, examine all the inputs spent, firing + // of a notification if it spends any of the + // outpoints within the set of our registered + // outputs. + b.checkSpendTrigger(tx) } - // The registered notification requires more - // than one confirmation before triggering. So - // we create a heapConf entry for this notification. - // The heapConf allows us to easily keep track of - // which notification(s) we should fire off with - // each incoming block. - confNtfn.initialConfirmHeight = uint32(txNtfn.Block.Height) - heapEntry := &confEntry{ - confNtfn, - confNtfn.initialConfirmHeight + confNtfn.numConfirmations, - } - heap.Push(b.confHeap, heapEntry) - } - case blockNtfn := <-b.connectedBlocks: - blockHeight := uint32(blockNtfn.Height) - - // Traverse our confirmation heap. The heap is a - // min-heap, so the confirmation notification which requires - // the smallest block-height will always be at the top - // of the heap. If a confirmation notification is eligible - // for triggering, then fire it off, and check if another - // is eligible until there are no more eligible entries. - nextConf := heap.Pop(b.confHeap).(*confEntry) - for nextConf.triggerHeight <= blockHeight { - nextConf.finConf <- struct{}{} - - nextConf = heap.Pop(b.confHeap).(*confEntry) + // A new block has been connected to the main + // chain. Send out any N confirmation notifications + // which may have been triggered by this new block. + b.notifyConfs(blockHeight) } - heap.Push(b.confHeap, nextConf) - case delBlockNtfn := <-b.disconnectedBlocks: // TODO(roasbeef): re-orgs // * second channel to notify of confirmation decrementing // re-org? // * notify of negative confirmations - fmt.Println(delBlockNtfn) + fmt.Println(txNtfn.DetachedBlocks) case <-b.quit: break out } } } -// initAllNotifications... -func (b *BtcdNotifier) initAllNotifications() error { - var err error +// notifyConfs... +func (b *BtcdNotifier) notifyConfs(newBlockHeight uint32) { + // Traverse our confirmation heap. The heap is a + // min-heap, so the confirmation notification which requires + // the smallest block-height will always be at the top + // of the heap. If a confirmation notification is eligible + // for triggering, then fire it off, and check if another + // is eligible until there are no more eligible entries. + nextConf := heap.Pop(b.confHeap).(*confEntry) + for nextConf.triggerHeight <= newBlockHeight { + nextConf.finConf <- struct{}{} - b.connectedBlocks, err = b.conn.ListenConnectedBlocks() - if err != nil { - return err - } - b.disconnectedBlocks, err = b.conn.ListenDisconnectedBlocks() - if err != nil { - return err - } - b.relevantTxs, err = b.conn.ListenRelevantTxs() - if err != nil { - return err + nextConf = heap.Pop(b.confHeap).(*confEntry) } - return nil + heap.Push(b.confHeap, nextConf) +} + +// checkSpendTrigger... +func (b *BtcdNotifier) checkSpendTrigger(tx *wire.MsgTx) { + // First, check if this transaction spends an output + // that has an existing spend notification for it. + for i, txIn := range tx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an + // output which we have a registered notification + // for, then create a spend summary, finally + // sending off the details to the notification + // subscriber. + if ntfn, ok := b.spendNotifications[prevOut]; ok { + spenderSha := tx.TxSha() + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: ntfn.targetOutpoint, + SpenderTxHash: &spenderSha, + // TODO(roasbeef): copy tx? + SpendingTx: tx, + SpenderInputIndex: uint32(i), + } + + ntfn.spendChan <- spendDetails + delete(b.spendNotifications, prevOut) + } + } +} + +// checkConfirmationTrigger... +func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, + blockHeight uint32) { + // If a confirmation notification has been registered + // for this txid, then either trigger a notification + // event if only a single confirmation notification was + // requested, or place the notification on the + // confirmation heap for future usage. + if confNtfn, ok := b.confNotifications[*txSha]; ok { + if confNtfn.numConfirmations == 1 { + confNtfn.finConf <- struct{}{} + return + } + + // The registered notification requires more + // than one confirmation before triggering. So + // we create a heapConf entry for this notification. + // The heapConf allows us to easily keep track of + // which notification(s) we should fire off with + // each incoming block. + confNtfn.initialConfirmHeight = blockHeight + heapEntry := &confEntry{ + confNtfn, + confNtfn.initialConfirmHeight + confNtfn.numConfirmations, + } + heap.Push(b.confHeap, heapEntry) + } } // spendNotification....