diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index 0ef874aa..003bb635 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -72,13 +72,8 @@ type BtcdNotifier struct { disconnectedBlockHashes chan *blockNtfn - chainUpdates []*chainUpdate - chainUpdateSignal chan struct{} - chainUpdateMtx sync.Mutex - - txUpdates []*txUpdate - txUpdateSignal chan struct{} - txUpdateMtx sync.Mutex + chainUpdates *chainntnfs.ConcurrentQueue + txUpdates *chainntnfs.ConcurrentQueue wg sync.WaitGroup quit chan struct{} @@ -104,8 +99,8 @@ func New(config *rpcclient.ConnConfig) (*BtcdNotifier, error) { disconnectedBlockHashes: make(chan *blockNtfn, 20), - chainUpdateSignal: make(chan struct{}), - txUpdateSignal: make(chan struct{}), + chainUpdates: chainntnfs.NewConcurrentQueue(10), + txUpdates: chainntnfs.NewConcurrentQueue(10), quit: make(chan struct{}), } @@ -151,6 +146,9 @@ func (b *BtcdNotifier) Start() error { return err } + b.chainUpdates.Start() + b.txUpdates.Start() + b.wg.Add(1) go b.notificationDispatcher(currentHeight) @@ -171,6 +169,9 @@ func (b *BtcdNotifier) Stop() error { close(b.quit) b.wg.Wait() + b.chainUpdates.Stop() + b.txUpdates.Stop() + // Notify all pending clients of our shutdown by closing the related // notification channels. for _, spendClients := range b.spendNotifications { @@ -204,16 +205,7 @@ type blockNtfn struct { func (b *BtcdNotifier) onBlockConnected(hash *chainhash.Hash, height int32, t time.Time) { // Append this new chain update to the end of the queue of new chain // updates. - b.chainUpdateMtx.Lock() - b.chainUpdates = append(b.chainUpdates, &chainUpdate{hash, height}) - b.chainUpdateMtx.Unlock() - - // Launch a goroutine to signal the notification dispatcher that a new - // block update is available. We do this in a new goroutine in order to - // avoid blocking the main loop of the rpc client. - go func() { - b.chainUpdateSignal <- struct{}{} - }() + b.chainUpdates.ChanIn() <- &chainUpdate{hash, height} } // onBlockDisconnected implements on OnBlockDisconnected callback for rpcclient. @@ -224,16 +216,7 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *chainhash.Hash, height int32, t func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) { // Append this new transaction update to the end of the queue of new // chain updates. - b.txUpdateMtx.Lock() - b.txUpdates = append(b.txUpdates, &txUpdate{tx, details}) - b.txUpdateMtx.Unlock() - - // Launch a goroutine to signal the notification dispatcher that a new - // transaction update is available. We do this in a new goroutine in - // order to avoid blocking the main loop of the rpc client. - go func() { - b.txUpdateSignal <- struct{}{} - }() + b.txUpdates.ChanIn() <- &txUpdate{tx, details} } // notificationDispatcher is the primary goroutine which handles client @@ -314,15 +297,8 @@ out: chainntnfs.Log.Warnf("Block disconnected from main "+ "chain: %v", staleBlockHash) - case <-b.chainUpdateSignal: - // A new update is available, so pop the new chain - // update from the front of the update queue. - b.chainUpdateMtx.Lock() - update := b.chainUpdates[0] - b.chainUpdates[0] = nil // Set to nil to prevent GC leak. - b.chainUpdates = b.chainUpdates[1:] - b.chainUpdateMtx.Unlock() - + case item := <-b.chainUpdates.ChanOut(): + update := item.(*chainUpdate) currentHeight = update.blockHeight newBlock, err := b.chainConn.GetBlock(update.blockHash) @@ -355,15 +331,8 @@ out: // which may have been triggered by this new block. b.notifyConfs(newHeight) - case <-b.txUpdateSignal: - // A new update is available, so pop the new chain - // update from the front of the update queue. - b.txUpdateMtx.Lock() - newSpend := b.txUpdates[0] - b.txUpdates[0] = nil // Set to nil to prevent GC leak. - b.txUpdates = b.txUpdates[1:] - b.txUpdateMtx.Unlock() - + case item := <-b.txUpdates.ChanOut(): + newSpend := item.(*txUpdate) spendingTx := newSpend.tx // First, check if this transaction spends an output