From 6d15be5b792ce05217ffa836a4b63ba65776df88 Mon Sep 17 00:00:00 2001 From: "Johan T. Halseth" Date: Mon, 2 Oct 2017 17:15:42 +0200 Subject: [PATCH] routing/chainview: use blockEventQueue for neutrino block events This commit makes use of the blockEventQueue within the neutrino implementation of FilteredChainView to ensure connected and disconnected blocks are consumed in order by the reader. It also specifies that neutrino is not to send disconnected blocks notifications during rescans, making it consistent with the btcd implementation. --- routing/chainview/neutrino.go | 127 +++++++++++++--------------------- 1 file changed, 50 insertions(+), 77 deletions(-) diff --git a/routing/chainview/neutrino.go b/routing/chainview/neutrino.go index 15d415b9..b877ce94 100644 --- a/routing/chainview/neutrino.go +++ b/routing/chainview/neutrino.go @@ -35,16 +35,10 @@ type CfFilteredChainView struct { // rescan will be sent over. rescanErrChan <-chan error - // newBlocks is the channel in which new filtered blocks are sent over. - newBlocks chan *FilteredBlock - - // staleBlocks is the channel in which blocks that have been - // disconnected from the mainchain are sent over. - staleBlocks chan *FilteredBlock - - // filterUpdates is a channel in which updates to the utxo filter - // attached to this instance are sent over. - filterUpdates chan filterUpdate + // blockEventQueue is the ordered queue used to keep the order + // of connected and disconnected blocks sent to the reader of the + // chainView. + blockQueue *blockEventQueue // chainFilter is the filterMtx sync.RWMutex @@ -65,11 +59,9 @@ var _ FilteredChainView = (*CfFilteredChainView)(nil) // this function. func NewCfFilteredChainView(node *neutrino.ChainService) (*CfFilteredChainView, error) { return &CfFilteredChainView{ - newBlocks: make(chan *FilteredBlock), - staleBlocks: make(chan *FilteredBlock), + blockQueue: newBlockEventQueue(), quit: make(chan struct{}), rescanErrChan: make(chan error), - filterUpdates: make(chan filterUpdate), chainFilter: make(map[wire.OutPoint]struct{}), p2pNode: node, }, nil @@ -122,6 +114,8 @@ func (c *CfFilteredChainView) Start() error { c.chainView = c.p2pNode.NewRescan(rescanOptions...) c.rescanErrChan = c.chainView.Start() + c.blockQueue.Start() + c.wg.Add(1) go c.chainFilterer() @@ -140,6 +134,7 @@ func (c *CfFilteredChainView) Stop() error { log.Infof("FilteredChainView stopping") close(c.quit) + c.blockQueue.Stop() c.wg.Wait() return nil @@ -164,13 +159,16 @@ func (c *CfFilteredChainView) onFilteredBlockConnected(height int32, } - go func() { - c.newBlocks <- &FilteredBlock{ - Hash: header.BlockHash(), - Height: uint32(height), - Transactions: mtxs, - } - }() + block := &FilteredBlock{ + Hash: header.BlockHash(), + Height: uint32(height), + Transactions: mtxs, + } + + c.blockQueue.Add(&blockEvent{ + eventType: connected, + block: block, + }) } // onFilteredBlockDisconnected is a callback which is executed once a block is @@ -178,59 +176,29 @@ func (c *CfFilteredChainView) onFilteredBlockConnected(height int32, func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32, header *wire.BlockHeader) { + log.Debugf("got disconnected block at height %d: %v", height, + header.BlockHash()) + filteredBlock := &FilteredBlock{ Hash: header.BlockHash(), Height: uint32(height), } - go func() { - c.staleBlocks <- filteredBlock - }() + c.blockQueue.Add(&blockEvent{ + eventType: disconnected, + block: filteredBlock, + }) } // chainFilterer is the primary coordination goroutine within the -// CfFilteredChainView. This goroutine handles errors from the running rescan, -// and also filter updates. +// CfFilteredChainView. This goroutine handles errors from the running rescan. func (c *CfFilteredChainView) chainFilterer() { defer c.wg.Done() for { select { - case err := <-c.rescanErrChan: log.Errorf("Error encountered during rescan: %v", err) - - // We've received a new update to the filter from the caller to - // mutate their established chain view. - case update := <-c.filterUpdates: - log.Debugf("Updating chain filter with new UTXO's: %v", - update.newUtxos) - - // First, we'll update the current chain view, by - // adding any new UTXO's, ignoring duplicates int he - // process. - c.filterMtx.Lock() - for _, op := range update.newUtxos { - c.chainFilter[op] = struct{}{} - } - c.filterMtx.Unlock() - - // With our internal chain view update, we'll craft a - // new update to the chainView which includes our new - // UTXO's, and current update height. - rescanUpdate := []neutrino.UpdateOption{ - neutrino.AddOutPoints(update.newUtxos...), - neutrino.Rewind(update.updateHeight), - } - err := c.chainView.Update(rescanUpdate...) - if err != nil { - log.Errorf("unable to update rescan: %v", err) - } - - if update.done != nil { - close(update.done) - } - case <-c.quit: return } @@ -343,27 +311,32 @@ func (c *CfFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredB // rewound to ensure all relevant notifications are dispatched. // // NOTE: This is part of the FilteredChainView interface. -func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uint32) error { - doneChan := make(chan struct{}) - update := filterUpdate{ - newUtxos: ops, - updateHeight: updateHeight, - done: doneChan, - } +func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, + updateHeight uint32) error { + log.Debugf("Updating chain filter with new UTXO's: %v", ops) - select { - case c.filterUpdates <- update: - case <-c.quit: - return fmt.Errorf("chain filter shutting down") + // First, we'll update the current chain view, by + // adding any new UTXO's, ignoring duplicates in the + // process. + c.filterMtx.Lock() + for _, op := range ops { + c.chainFilter[op] = struct{}{} } + c.filterMtx.Unlock() - select { - case <-doneChan: - return nil - case <-c.quit: - return fmt.Errorf("chain filter shutting down") + // With our internal chain view update, we'll craft a + // new update to the chainView which includes our new + // UTXO's, and current update height. + rescanUpdate := []neutrino.UpdateOption{ + neutrino.AddOutPoints(ops...), + neutrino.Rewind(updateHeight), + neutrino.DisableDisconnectedNtfns(true), } - + err := c.chainView.Update(rescanUpdate...) + if err != nil { + return fmt.Errorf("unable to update rescan: %v", err) + } + return nil } // FilteredBlocks returns the channel that filtered blocks are to be sent over. @@ -373,7 +346,7 @@ func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uin // // NOTE: This is part of the FilteredChainView interface. func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { - return c.newBlocks + return c.blockQueue.newBlocks } // DisconnectedBlocks returns a receive only channel which will be sent upon @@ -382,5 +355,5 @@ func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { // // NOTE: This is part of the FilteredChainView interface. func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock { - return c.staleBlocks + return c.blockQueue.staleBlocks }