diff --git a/chainntfs/btcdnotify/btcd.go b/chainntfs/btcdnotify/btcd.go index 1da5a21d..b9951392 100644 --- a/chainntfs/btcdnotify/btcd.go +++ b/chainntfs/btcdnotify/btcd.go @@ -1,16 +1,16 @@ package btcdnotify import ( - "bytes" "container/heap" "fmt" "sync" "sync/atomic" + "time" + "github.com/btcsuite/btcd/btcjson" "github.com/btcsuite/btcd/wire" - "github.com/btcsuite/btcwallet/chain" - btcwallet "github.com/btcsuite/btcwallet/wallet" - "github.com/btcsuite/btcwallet/wtxmgr" + "github.com/btcsuite/btcrpcclient" + "github.com/btcsuite/btcutil" "github.com/lightningnetwork/lnd/chainntfs" ) @@ -19,8 +19,7 @@ type BtcdNotifier struct { started int32 // To be used atomically stopped int32 // To be used atomically - ntfnSource *btcwallet.NotificationServer - chainConn *chain.RPCClient + chainConn *btcrpcclient.Client notificationRegistry chan interface{} @@ -30,9 +29,9 @@ type BtcdNotifier struct { confNotifications map[wire.ShaHash]*confirmationsNotification confHeap *confirmationHeap - connectedBlocks <-chan wtxmgr.BlockMeta - disconnectedBlocks <-chan wtxmgr.BlockMeta - relevantTxs <-chan chain.RelevantTx + connectedBlockHashes chan *blockNtfn + disconnectedBlockHashes chan *blockNtfn + relevantTxs chan *btcutil.Tx wg sync.WaitGroup quit chan struct{} @@ -41,15 +40,12 @@ type BtcdNotifier struct { var _ chainntnfs.ChainNotifier = (*BtcdNotifier)(nil) // NewBtcdNotifier... -// TODO(roasbeef): chain client + notification sever -// * use server for notifications +// TODO(roasbeef): // * when asked for spent, request via client -func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer, - chainConn *chain.RPCClient) (*BtcdNotifier, error) { - - return &BtcdNotifier{ - ntfnSource: ntfnSource, - chainConn: chainConn, +//func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer, +// chainConn *chain.RPCClient) (*BtcdNotifier, error) { +func NewBtcdNotifier(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) { + notifier := &BtcdNotifier{ notificationRegistry: make(chan interface{}), @@ -57,12 +53,28 @@ func NewBtcdNotifier(ntfnSource *btcwallet.NotificationServer, confNotifications: make(map[wire.ShaHash]*confirmationsNotification), confHeap: newConfirmationHeap(), - connectedBlocks: make(chan wtxmgr.BlockMeta), - disconnectedBlocks: make(chan wtxmgr.BlockMeta), - relevantTxs: make(chan chain.RelevantTx), + connectedBlockHashes: make(chan *blockNtfn, 20), + disconnectedBlockHashes: make(chan *blockNtfn, 20), + relevantTxs: make(chan *btcutil.Tx, 100), quit: make(chan struct{}), - }, nil + } + + ntfnCallbacks := &btcrpcclient.NotificationHandlers{ + OnBlockConnected: notifier.onBlockConnected, + OnBlockDisconnected: notifier.onBlockDisconnected, + OnRedeemingTx: notifier.onRedeemingTx, + } + + config.DisableConnectOnNew = true + config.DisableAutoReconnect = false + chainConn, err := btcrpcclient.New(config, ntfnCallbacks) + if err != nil { + return nil, err + } + notifier.chainConn = chainConn + + return notifier, nil } // Start... @@ -72,6 +84,14 @@ func (b *BtcdNotifier) Start() error { return nil } + if err := b.chainConn.Connect(20); err != nil { + return err + } + + if err := b.chainConn.NotifyBlocks(); err != nil { + return err + } + b.wg.Add(1) go b.notificationDispatcher() @@ -85,16 +105,43 @@ func (b *BtcdNotifier) Stop() error { return nil } + b.chainConn.Shutdown() + close(b.quit) b.wg.Wait() return nil } +// connectedBlock... +type blockNtfn struct { + sha *wire.ShaHash + height int32 +} + +// onBlockConnected... +func (b *BtcdNotifier) onBlockConnected(hash *wire.ShaHash, height int32, t time.Time) { + select { + case b.connectedBlockHashes <- &blockNtfn{hash, height}: + case <-b.quit: + } +} + +// onBlockDisconnected... +func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t time.Time) { + b.onBlockDisconnected(hash, height, t) +} + +// onRedeemingTx... +func (b *BtcdNotifier) onRedeemingTx(transaction *btcutil.Tx, details *btcjson.BlockDetails) { + select { + case b.relevantTxs <- transaction: + case <-b.quit: + } +} + // notificationDispatcher... func (b *BtcdNotifier) notificationDispatcher() { - ntfnClient := b.ntfnSource.TransactionNotifications() - out: for { select { @@ -105,66 +152,73 @@ out: case *confirmationsNotification: b.confNotifications[*msg.txid] = msg } - case txNtfn := <-ntfnClient.C: - // We're only concerned with newly mined blocks which - // may or may not include transactions we are interested - // in. - if txNtfn.AttachedBlocks == nil { - break - } - - newBlocks := txNtfn.AttachedBlocks - for _, block := range newBlocks { - blockHeight := uint32(block.Height) - - // Examine all transactions within the block - // in order to determine if this block includes a - // transactions spending one of the registered - // outpoints of interest. - for _, txSummary := range block.Transactions { - txBytes := bytes.NewReader(txSummary.Transaction) - tx := wire.NewMsgTx() - if err := tx.Deserialize(txBytes); err != nil { - // TODO(roasbeef): err - fmt.Println("unable to des tx: ", err) - continue - } - - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the notification - // on a heap to be triggered in the future once - // additional confirmations are attained. - txSha := tx.TxSha() - b.checkConfirmationTrigger(&txSha, blockHeight) - - // Next, examine all the inputs spent, firing - // of a notification if it spends any of the - // outpoints within the set of our registered - // outputs. - b.checkSpendTrigger(tx) - } - - // A new block has been connected to the main - // chain. Send out any N confirmation notifications - // which may have been triggered by this new block. - b.notifyConfs(blockHeight) - } - + case staleBlockHash := <-b.disconnectedBlockHashes: // TODO(roasbeef): re-orgs // * second channel to notify of confirmation decrementing // re-org? // * notify of negative confirmations - fmt.Println(txNtfn.DetachedBlocks) + fmt.Println(staleBlockHash) + case connectedBlock := <-b.connectedBlockHashes: + newBlock, err := b.chainConn.GetBlock(connectedBlock.sha) + if err != nil { + continue + } + + newHeight := connectedBlock.height + for _, tx := range newBlock.Transactions() { + // Check if the inclusion of this transaction + // within a block by itself triggers a block + // confirmation threshold, if so send a + // notification. Otherwise, place the notification + // on a heap to be triggered in the future once + // additional confirmations are attained. + txSha := tx.Sha() + b.checkConfirmationTrigger(txSha, newHeight) + } + + // A new block has been connected to the main + // chain. Send out any N confirmation notifications + // which may have been triggered by this new block. + b.notifyConfs(newHeight) + case newSpend := <-b.relevantTxs: + // First, check if this transaction spends an output + // that has an existing spend notification for it. + for i, txIn := range newSpend.MsgTx().TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an + // output which we have a registered notification + // for, then create a spend summary, finally + // sending off the details to the notification + // subscriber. + if ntfn, ok := b.spendNotifications[prevOut]; ok { + spenderSha := newSpend.Sha() + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: ntfn.targetOutpoint, + SpenderTxHash: spenderSha, + // TODO(roasbeef): copy tx? + SpendingTx: newSpend.MsgTx(), + SpenderInputIndex: uint32(i), + } + + ntfn.spendChan <- spendDetails + delete(b.spendNotifications, prevOut) + } + } case <-b.quit: break out } } + b.wg.Done() } // notifyConfs... -func (b *BtcdNotifier) notifyConfs(newBlockHeight uint32) { +func (b *BtcdNotifier) notifyConfs(newBlockHeight int32) { + // If the heap is empty, we have nothing to do. + if b.confHeap.Len() == 0 { + return + } + // Traverse our confirmation heap. The heap is a // min-heap, so the confirmation notification which requires // the smallest block-height will always be at the top @@ -172,52 +226,29 @@ func (b *BtcdNotifier) notifyConfs(newBlockHeight uint32) { // for triggering, then fire it off, and check if another // is eligible until there are no more eligible entries. nextConf := heap.Pop(b.confHeap).(*confEntry) - for nextConf.triggerHeight <= newBlockHeight { + for nextConf.triggerHeight <= uint32(newBlockHeight) { nextConf.finConf <- struct{}{} + if b.confHeap.Len() == 0 { + return + } + nextConf = heap.Pop(b.confHeap).(*confEntry) } heap.Push(b.confHeap, nextConf) } -// checkSpendTrigger... -func (b *BtcdNotifier) checkSpendTrigger(tx *wire.MsgTx) { - // First, check if this transaction spends an output - // that has an existing spend notification for it. - for i, txIn := range tx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does spend an - // output which we have a registered notification - // for, then create a spend summary, finally - // sending off the details to the notification - // subscriber. - if ntfn, ok := b.spendNotifications[prevOut]; ok { - spenderSha := tx.TxSha() - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: ntfn.targetOutpoint, - SpenderTxHash: &spenderSha, - // TODO(roasbeef): copy tx? - SpendingTx: tx, - SpenderInputIndex: uint32(i), - } - - ntfn.spendChan <- spendDetails - delete(b.spendNotifications, prevOut) - } - } -} - // checkConfirmationTrigger... -func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, - blockHeight uint32) { +// TODO(roasbeef): perheps lookup, then track by inputs instead? +func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, blockHeight int32) { // If a confirmation notification has been registered // for this txid, then either trigger a notification // event if only a single confirmation notification was // requested, or place the notification on the // confirmation heap for future usage. if confNtfn, ok := b.confNotifications[*txSha]; ok { + delete(b.confNotifications, *txSha) if confNtfn.numConfirmations == 1 { confNtfn.finConf <- struct{}{} return @@ -229,10 +260,11 @@ func (b *BtcdNotifier) checkConfirmationTrigger(txSha *wire.ShaHash, // The heapConf allows us to easily keep track of // which notification(s) we should fire off with // each incoming block. - confNtfn.initialConfirmHeight = blockHeight + confNtfn.initialConfirmHeight = uint32(blockHeight) + finalConfHeight := uint32(confNtfn.initialConfirmHeight + confNtfn.numConfirmations - 1) heapEntry := &confEntry{ confNtfn, - confNtfn.initialConfirmHeight + confNtfn.numConfirmations, + finalConfHeight, } heap.Push(b.confHeap, heapEntry) } @@ -248,8 +280,9 @@ type spendNotification struct { // RegisterSpendNotification... // NOTE: eventChan MUST be buffered func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint) (*chainntnfs.SpendEvent, error) { - - // TODO(roasbeef): also register with rpc client? bool? + if err := b.chainConn.NotifySpent([]*wire.OutPoint{outpoint}); err != nil { + return nil, err + } ntfn := &spendNotification{ targetOutpoint: outpoint, @@ -270,7 +303,7 @@ type confirmationsNotification struct { numConfirmations uint32 finConf chan struct{} - negativeConf chan uint32 + negativeConf chan int32 } // RegisterConfirmationsNotification... @@ -281,7 +314,7 @@ func (b *BtcdNotifier) RegisterConfirmationsNtfn(txid *wire.ShaHash, txid: txid, numConfirmations: numConfs, finConf: make(chan struct{}, 1), - negativeConf: make(chan uint32, 1), + negativeConf: make(chan int32, 1), } b.notificationRegistry <- ntfn