From fa513a76ad6dc49b8feb2b782d11557e9d2d91c6 Mon Sep 17 00:00:00 2001 From: Jim Posen Date: Fri, 10 Nov 2017 11:01:36 -0800 Subject: [PATCH] chainntnfs/neutrino: Handle block connects and disconnects in order. --- chainntnfs/neutrinonotify/neutrino.go | 205 ++++++++++++++------------ 1 file changed, 111 insertions(+), 94 deletions(-) diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index 87d28679..821252d1 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -65,8 +65,7 @@ type NeutrinoNotifier struct { rescanErr <-chan error - newBlocks *chainntnfs.ConcurrentQueue - staleBlocks *chainntnfs.ConcurrentQueue + chainUpdates *chainntnfs.ConcurrentQueue wg sync.WaitGroup quit chan struct{} @@ -96,9 +95,7 @@ func New(node *neutrino.ChainService) (*NeutrinoNotifier, error) { rescanErr: make(chan error), - newBlocks: chainntnfs.NewConcurrentQueue(10), - - staleBlocks: chainntnfs.NewConcurrentQueue(10), + chainUpdates: chainntnfs.NewConcurrentQueue(10), quit: make(chan struct{}), } @@ -150,8 +147,7 @@ func (n *NeutrinoNotifier) Start() error { n.chainView = n.p2pNode.NewRescan(rescanOptions...) n.rescanErr = n.chainView.Start() - n.newBlocks.Start() - n.staleBlocks.Start() + n.chainUpdates.Start() n.wg.Add(1) go n.notificationDispatcher() @@ -169,8 +165,7 @@ func (n *NeutrinoNotifier) Stop() error { close(n.quit) n.wg.Wait() - n.newBlocks.Stop() - n.staleBlocks.Stop() + n.chainUpdates.Stop() // Notify all pending clients of our shutdown by closing the related // notification channels. @@ -200,6 +195,10 @@ type filteredBlock struct { hash chainhash.Hash height uint32 txns []*btcutil.Tx + + // connected is true if this update is a new block and false if it is a + // disconnected block. + connect bool } // onFilteredBlockConnected is a callback which is executed each a new block is @@ -209,10 +208,11 @@ func (n *NeutrinoNotifier) onFilteredBlockConnected(height int32, // Append this new chain update to the end of the queue of new chain // updates. - n.newBlocks.ChanIn() <- &filteredBlock{ - hash: header.BlockHash(), - height: uint32(height), - txns: txns, + n.chainUpdates.ChanIn() <- &filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + txns: txns, + connect: true, } } @@ -223,9 +223,10 @@ func (n *NeutrinoNotifier) onFilteredBlockDisconnected(height int32, // Append this new chain update to the end of the queue of new chain // disconnects. - n.staleBlocks.ChanIn() <- &filteredBlock{ - hash: header.BlockHash(), - height: uint32(height), + n.chainUpdates.ChanIn() <- &filteredBlock{ + hash: header.BlockHash(), + height: uint32(height), + connect: false, } } @@ -318,87 +319,40 @@ func (n *NeutrinoNotifier) notificationDispatcher() { n.blockEpochClients[msg.epochID] = msg } - case item := <-n.newBlocks.ChanOut(): - newBlock := item.(*filteredBlock) - - n.heightMtx.Lock() - n.bestHeight = newBlock.height - n.heightMtx.Unlock() - - chainntnfs.Log.Infof("New block: height=%v, sha=%v", - newBlock.height, newBlock.hash) - - // First we'll notify any subscribed clients of the - // block. - n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) - - // Next, we'll scan over the list of relevant - // transactions and possibly dispatch notifications for - // confirmations and spends. - for _, tx := range newBlock.txns { - // Check if the inclusion of this transaction - // within a block by itself triggers a block - // confirmation threshold, if so send a - // notification. Otherwise, place the - // notification on a heap to be triggered in - // the future once additional confirmations are - // attained. - mtx := tx.MsgTx() - txIndex := tx.Index() - txSha := mtx.TxHash() - n.checkConfirmationTrigger(&txSha, newBlock, txIndex) - - for i, txIn := range mtx.TxIn { - prevOut := txIn.PreviousOutPoint - - // If this transaction indeed does - // spend an output which we have a - // registered notification for, then - // create a spend summary, finally - // sending off the details to the - // notification subscriber. - if clients, ok := n.spendNotifications[prevOut]; ok { - // TODO(roasbeef): many - // integration tests expect - // spend to be notified within - // the mempool. - spendDetails := &chainntnfs.SpendDetail{ - SpentOutPoint: &prevOut, - SpenderTxHash: &txSha, - SpendingTx: mtx, - SpenderInputIndex: uint32(i), - SpendingHeight: int32(newBlock.height), - } - - for _, ntfn := range clients { - chainntnfs.Log.Infof("Dispatching "+ - "spend notification for "+ - "outpoint=%v", ntfn.targetOutpoint) - ntfn.spendChan <- spendDetails - - // Close spendChan to ensure that any calls to Cancel will not - // block. This is safe to do since the channel is buffered, and the - // message can still be read by the receiver. - close(ntfn.spendChan) - } - - delete(n.spendNotifications, prevOut) - } - + case item := <-n.chainUpdates.ChanOut(): + update := item.(*filteredBlock) + if update.connect { + n.heightMtx.Lock() + if update.height != n.bestHeight+1 { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, new height=%d", + n.bestHeight, update.height) } + n.bestHeight = update.height + n.heightMtx.Unlock() + + chainntnfs.Log.Infof("New block: height=%v, sha=%v", + update.height, update.hash) + + err := n.handleBlockConnected(update) + if err != nil { + chainntnfs.Log.Error(err) + } + } else { + n.heightMtx.Lock() + if update.height != n.bestHeight { + chainntnfs.Log.Warnf("Received blocks out of order: "+ + "current height=%d, disconnected height=%d", + n.bestHeight, update.height) + } + + n.bestHeight = update.height - 1 + n.heightMtx.Unlock() + + chainntnfs.Log.Infof("Block disconnected from main chain: "+ + "height=%v, sha=%v", update.height, update.hash) } - - // A new block has been connected to the main chain. - // Send out any N confirmation notifications which may - // have been triggered by this new block. - n.notifyConfs(int32(newBlock.height)) - - case item := <-n.staleBlocks.ChanOut(): - staleBlock := item.(*filteredBlock) - chainntnfs.Log.Warnf("Block disconnected from main "+ - "chain: %v", staleBlock.hash) - case err := <-n.rescanErr: chainntnfs.Log.Errorf("Error during rescan: %v", err) @@ -525,6 +479,69 @@ chainScan: return true } +// handleBlocksConnected applies a chain update for a new block. Any watched +// transactions included this block will processed to either send notifications +// now or after numConfirmations confs. +func (n *NeutrinoNotifier) handleBlockConnected(newBlock *filteredBlock) error { + // First we'll notify any subscribed clients of the block. + n.notifyBlockEpochs(int32(newBlock.height), &newBlock.hash) + + // Next, we'll scan over the list of relevant transactions and possibly + // dispatch notifications for confirmations and spends. + for _, tx := range newBlock.txns { + // Check if the inclusion of this transaction within a block by itself + // triggers a block confirmation threshold, if so send a notification. + // Otherwise, place the notification on a heap to be triggered in the + // future once additional confirmations are attained. + mtx := tx.MsgTx() + txIndex := tx.Index() + txSha := mtx.TxHash() + n.checkConfirmationTrigger(&txSha, newBlock, txIndex) + + for i, txIn := range mtx.TxIn { + prevOut := txIn.PreviousOutPoint + + // If this transaction indeed does spend an output which we have a + // registered notification for, then create a spend summary, finally + // sending off the details to the notification subscriber. + clients, ok := n.spendNotifications[prevOut] + if !ok { + continue + } + + // TODO(roasbeef): many integration tests expect spend to be + // notified within the mempool. + spendDetails := &chainntnfs.SpendDetail{ + SpentOutPoint: &prevOut, + SpenderTxHash: &txSha, + SpendingTx: mtx, + SpenderInputIndex: uint32(i), + SpendingHeight: int32(newBlock.height), + } + + for _, ntfn := range clients { + chainntnfs.Log.Infof("Dispatching spend notification for "+ + "outpoint=%v", ntfn.targetOutpoint) + ntfn.spendChan <- spendDetails + + // Close spendChan to ensure that any calls to Cancel will not + // block. This is safe to do since the channel is buffered, and + // the message can still be read by the receiver. + close(ntfn.spendChan) + } + + delete(n.spendNotifications, prevOut) + } + } + + // A new block has been connected to the main chain. + // Send out any N confirmation notifications which may + // have been triggered by this new block. + n.notifyConfs(int32(newBlock.height)) + + return nil +} + // notifyBlockEpochs notifies all registered block epoch clients of the newly // connected block to the main chain. func (n *NeutrinoNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash) {