diff --git a/routing/chainview/neutrino.go b/routing/chainview/neutrino.go index 15d415b9..b877ce94 100644 --- a/routing/chainview/neutrino.go +++ b/routing/chainview/neutrino.go @@ -35,16 +35,10 @@ type CfFilteredChainView struct { // rescan will be sent over. rescanErrChan <-chan error - // newBlocks is the channel in which new filtered blocks are sent over. - newBlocks chan *FilteredBlock - - // staleBlocks is the channel in which blocks that have been - // disconnected from the mainchain are sent over. - staleBlocks chan *FilteredBlock - - // filterUpdates is a channel in which updates to the utxo filter - // attached to this instance are sent over. - filterUpdates chan filterUpdate + // blockEventQueue is the ordered queue used to keep the order + // of connected and disconnected blocks sent to the reader of the + // chainView. + blockQueue *blockEventQueue // chainFilter is the filterMtx sync.RWMutex @@ -65,11 +59,9 @@ var _ FilteredChainView = (*CfFilteredChainView)(nil) // this function. func NewCfFilteredChainView(node *neutrino.ChainService) (*CfFilteredChainView, error) { return &CfFilteredChainView{ - newBlocks: make(chan *FilteredBlock), - staleBlocks: make(chan *FilteredBlock), + blockQueue: newBlockEventQueue(), quit: make(chan struct{}), rescanErrChan: make(chan error), - filterUpdates: make(chan filterUpdate), chainFilter: make(map[wire.OutPoint]struct{}), p2pNode: node, }, nil @@ -122,6 +114,8 @@ func (c *CfFilteredChainView) Start() error { c.chainView = c.p2pNode.NewRescan(rescanOptions...) c.rescanErrChan = c.chainView.Start() + c.blockQueue.Start() + c.wg.Add(1) go c.chainFilterer() @@ -140,6 +134,7 @@ func (c *CfFilteredChainView) Stop() error { log.Infof("FilteredChainView stopping") close(c.quit) + c.blockQueue.Stop() c.wg.Wait() return nil @@ -164,13 +159,16 @@ func (c *CfFilteredChainView) onFilteredBlockConnected(height int32, } - go func() { - c.newBlocks <- &FilteredBlock{ - Hash: header.BlockHash(), - Height: uint32(height), - Transactions: mtxs, - } - }() + block := &FilteredBlock{ + Hash: header.BlockHash(), + Height: uint32(height), + Transactions: mtxs, + } + + c.blockQueue.Add(&blockEvent{ + eventType: connected, + block: block, + }) } // onFilteredBlockDisconnected is a callback which is executed once a block is @@ -178,59 +176,29 @@ func (c *CfFilteredChainView) onFilteredBlockConnected(height int32, func (c *CfFilteredChainView) onFilteredBlockDisconnected(height int32, header *wire.BlockHeader) { + log.Debugf("got disconnected block at height %d: %v", height, + header.BlockHash()) + filteredBlock := &FilteredBlock{ Hash: header.BlockHash(), Height: uint32(height), } - go func() { - c.staleBlocks <- filteredBlock - }() + c.blockQueue.Add(&blockEvent{ + eventType: disconnected, + block: filteredBlock, + }) } // chainFilterer is the primary coordination goroutine within the -// CfFilteredChainView. This goroutine handles errors from the running rescan, -// and also filter updates. +// CfFilteredChainView. This goroutine handles errors from the running rescan. func (c *CfFilteredChainView) chainFilterer() { defer c.wg.Done() for { select { - case err := <-c.rescanErrChan: log.Errorf("Error encountered during rescan: %v", err) - - // We've received a new update to the filter from the caller to - // mutate their established chain view. - case update := <-c.filterUpdates: - log.Debugf("Updating chain filter with new UTXO's: %v", - update.newUtxos) - - // First, we'll update the current chain view, by - // adding any new UTXO's, ignoring duplicates int he - // process. - c.filterMtx.Lock() - for _, op := range update.newUtxos { - c.chainFilter[op] = struct{}{} - } - c.filterMtx.Unlock() - - // With our internal chain view update, we'll craft a - // new update to the chainView which includes our new - // UTXO's, and current update height. - rescanUpdate := []neutrino.UpdateOption{ - neutrino.AddOutPoints(update.newUtxos...), - neutrino.Rewind(update.updateHeight), - } - err := c.chainView.Update(rescanUpdate...) - if err != nil { - log.Errorf("unable to update rescan: %v", err) - } - - if update.done != nil { - close(update.done) - } - case <-c.quit: return } @@ -343,27 +311,32 @@ func (c *CfFilteredChainView) FilterBlock(blockHash *chainhash.Hash) (*FilteredB // rewound to ensure all relevant notifications are dispatched. // // NOTE: This is part of the FilteredChainView interface. -func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uint32) error { - doneChan := make(chan struct{}) - update := filterUpdate{ - newUtxos: ops, - updateHeight: updateHeight, - done: doneChan, - } +func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, + updateHeight uint32) error { + log.Debugf("Updating chain filter with new UTXO's: %v", ops) - select { - case c.filterUpdates <- update: - case <-c.quit: - return fmt.Errorf("chain filter shutting down") + // First, we'll update the current chain view, by + // adding any new UTXO's, ignoring duplicates in the + // process. + c.filterMtx.Lock() + for _, op := range ops { + c.chainFilter[op] = struct{}{} } + c.filterMtx.Unlock() - select { - case <-doneChan: - return nil - case <-c.quit: - return fmt.Errorf("chain filter shutting down") + // With our internal chain view update, we'll craft a + // new update to the chainView which includes our new + // UTXO's, and current update height. + rescanUpdate := []neutrino.UpdateOption{ + neutrino.AddOutPoints(ops...), + neutrino.Rewind(updateHeight), + neutrino.DisableDisconnectedNtfns(true), } - + err := c.chainView.Update(rescanUpdate...) + if err != nil { + return fmt.Errorf("unable to update rescan: %v", err) + } + return nil } // FilteredBlocks returns the channel that filtered blocks are to be sent over. @@ -373,7 +346,7 @@ func (c *CfFilteredChainView) UpdateFilter(ops []wire.OutPoint, updateHeight uin // // NOTE: This is part of the FilteredChainView interface. func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { - return c.newBlocks + return c.blockQueue.newBlocks } // DisconnectedBlocks returns a receive only channel which will be sent upon @@ -382,5 +355,5 @@ func (c *CfFilteredChainView) FilteredBlocks() <-chan *FilteredBlock { // // NOTE: This is part of the FilteredChainView interface. func (c *CfFilteredChainView) DisconnectedBlocks() <-chan *FilteredBlock { - return c.staleBlocks + return c.blockQueue.staleBlocks }