diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 5eb3ff98..9fd15b03 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -174,6 +174,9 @@ func (b *BitcoindNotifier) Stop() error { } } for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + close(epochClient.epochChan) } b.txConfNotifier.TearDown() @@ -213,7 +216,13 @@ out: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - // First, close the cancel channel for this + // First, we'll lookup the original + // registration in order to stop the active + // queue goroutine. + reg := b.blockEpochClients[msg.epochID] + reg.epochQueue.Stop() + + // Next, close the cancel channel for this // specific client, and wait for the client to // exit. close(b.blockEpochClients[msg.epochID].cancelChan) @@ -441,27 +450,14 @@ func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash. } for _, epochClient := range b.blockEpochClients { - b.wg.Add(1) - epochClient.wg.Add(1) - go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}, - clientWg *sync.WaitGroup) { + select { - // TODO(roasbeef): move to goroutine per client, use sync queue + case epochClient.epochQueue.ChanIn() <- epoch: - defer clientWg.Done() - defer b.wg.Done() + case <-epochClient.cancelChan: - select { - case ntfnChan <- epoch: - - case <-cancelChan: - return - - case <-b.quit: - return - } - - }(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg) + case <-b.quit: + } } } @@ -628,6 +624,8 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + epochQueue *chainntnfs.ConcurrentQueue + cancelChan chan struct{} wg sync.WaitGroup @@ -643,22 +641,58 @@ type epochCancel struct { // caller to receive notifications, of each new block connected to the main // chain. func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { - registration := &blockEpochRegistration{ + reg := &blockEpochRegistration{ + epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), } + reg.epochQueue.Start() + + // Before we send the request to the main goroutine, we'll launch a new + // goroutine to proxy items added to our queue to the client itself. + // This ensures that all notifications are received *in order*. + reg.wg.Add(1) + go func() { + defer reg.wg.Done() + + for { + select { + case ntfn := <-reg.epochQueue.ChanOut(): + blockNtfn := ntfn.(*chainntnfs.BlockEpoch) + select { + case reg.epochChan <- blockNtfn: + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + } + }() select { case <-b.quit: + // As we're exiting before the registration could be sent, + // we'll stop the queue now ourselves. + reg.epochQueue.Stop() + return nil, errors.New("chainntnfs: system interrupt while " + "attempting to register for block epoch notification.") - case b.notificationRegistry <- registration: + case b.notificationRegistry <- reg: return &chainntnfs.BlockEpochEvent{ - Epochs: registration.epochChan, + Epochs: reg.epochChan, Cancel: func() { cancel := &epochCancel{ - epochID: registration.epochID, + epochID: reg.epochID, } // Submit epoch cancellation to notification dispatcher. @@ -668,7 +702,7 @@ func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent // closed before yielding to caller. for { select { - case _, ok := <-registration.epochChan: + case _, ok := <-reg.epochChan: if !ok { return } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index a5ff7ec5..2f77e308 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -184,6 +184,9 @@ func (b *BtcdNotifier) Stop() error { } } for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + close(epochClient.epochChan) } b.txConfNotifier.TearDown() @@ -247,7 +250,13 @@ out: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - // First, close the cancel channel for this + // First, we'll lookup the original + // registration in order to stop the active + // queue goroutine. + reg := b.blockEpochClients[msg.epochID] + reg.epochQueue.Stop() + + // Next, close the cancel channel for this // specific client, and wait for the client to // exit. close(b.blockEpochClients[msg.epochID].cancelChan) @@ -260,7 +269,6 @@ out: // cancelled. close(b.blockEpochClients[msg.epochID].epochChan) delete(b.blockEpochClients, msg.epochID) - } case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { @@ -462,27 +470,14 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash } for _, epochClient := range b.blockEpochClients { - b.wg.Add(1) - epochClient.wg.Add(1) - go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}, - clientWg *sync.WaitGroup) { + select { - // TODO(roasbeef): move to goroutine per client, use sync queue + case epochClient.epochQueue.ChanIn() <- epoch: - defer clientWg.Done() - defer b.wg.Done() + case <-epochClient.cancelChan: - select { - case ntfnChan <- epoch: - - case <-cancelChan: - return - - case <-b.quit: - return - } - - }(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg) + case <-b.quit: + } } } @@ -631,6 +626,8 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + epochQueue *chainntnfs.ConcurrentQueue + cancelChan chan struct{} wg sync.WaitGroup @@ -646,32 +643,69 @@ type epochCancel struct { // caller to receive notifications, of each new block connected to the main // chain. func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { - registration := &blockEpochRegistration{ + reg := &blockEpochRegistration{ + epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), } + reg.epochQueue.Start() + + // Before we send the request to the main goroutine, we'll launch a new + // goroutine to proxy items added to our queue to the client itself. + // This ensures that all notifications are received *in order*. + reg.wg.Add(1) + go func() { + defer reg.wg.Done() + + for { + select { + case ntfn := <-reg.epochQueue.ChanOut(): + blockNtfn := ntfn.(*chainntnfs.BlockEpoch) + select { + case reg.epochChan <- blockNtfn: + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + } + }() select { case <-b.quit: + // As we're exiting before the registration could be sent, + // we'll stop the queue now ourselves. + reg.epochQueue.Stop() + return nil, errors.New("chainntnfs: system interrupt while " + "attempting to register for block epoch notification.") - case b.notificationRegistry <- registration: + case b.notificationRegistry <- reg: return &chainntnfs.BlockEpochEvent{ - Epochs: registration.epochChan, + Epochs: reg.epochChan, Cancel: func() { cancel := &epochCancel{ - epochID: registration.epochID, + epochID: reg.epochID, } // Submit epoch cancellation to notification dispatcher. select { case b.notificationCancels <- cancel: - // Cancellation is being handled, drain the epoch channel until it is - // closed before yielding to caller. + // Cancellation is being handled, drain + // the epoch channel until it is closed + // before yielding to caller. for { select { - case _, ok := <-registration.epochChan: + case _, ok := <-reg.epochChan: if !ok { return } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index a75bde2e..8324ab1d 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -181,6 +181,9 @@ func (n *NeutrinoNotifier) Stop() error { } } for _, epochClient := range n.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + close(epochClient.epochChan) } n.txConfNotifier.TearDown() @@ -257,7 +260,13 @@ func (n *NeutrinoNotifier) notificationDispatcher() { chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - // First, close the cancel channel for this + // First, we'll lookup the original + // registration in order to stop the active + // queue goroutine. + reg := n.blockEpochClients[msg.epochID] + reg.epochQueue.Stop() + + // Next, close the cancel channel for this // specific client, and wait for the client to // exit. close(n.blockEpochClients[msg.epochID].cancelChan) @@ -715,6 +724,8 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + epochQueue *chainntnfs.ConcurrentQueue + cancelChan chan struct{} wg sync.WaitGroup @@ -729,22 +740,58 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller // to receive notifications, of each new block connected to the main chain. func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { - registration := &blockEpochRegistration{ + reg := &blockEpochRegistration{ + epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&n.epochClientCounter, 1), } + reg.epochQueue.Start() + + // Before we send the request to the main goroutine, we'll launch a new + // goroutine to proxy items added to our queue to the client itself. + // This ensures that all notifications are received *in order*. + reg.wg.Add(1) + go func() { + defer reg.wg.Done() + + for { + select { + case ntfn := <-reg.epochQueue.ChanOut(): + blockNtfn := ntfn.(*chainntnfs.BlockEpoch) + select { + case reg.epochChan <- blockNtfn: + + case <-reg.cancelChan: + return + + case <-n.quit: + return + } + + case <-reg.cancelChan: + return + + case <-n.quit: + return + } + } + }() select { case <-n.quit: + // As we're exiting before the registration could be sent, + // we'll stop the queue now ourselves. + reg.epochQueue.Stop() + return nil, errors.New("chainntnfs: system interrupt while " + "attempting to register for block epoch notification.") - case n.notificationRegistry <- registration: + case n.notificationRegistry <- reg: return &chainntnfs.BlockEpochEvent{ - Epochs: registration.epochChan, + Epochs: reg.epochChan, Cancel: func() { cancel := &epochCancel{ - epochID: registration.epochID, + epochID: reg.epochID, } // Submit epoch cancellation to notification dispatcher. @@ -754,7 +801,7 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent // closed before yielding to caller. for { select { - case _, ok := <-registration.epochChan: + case _, ok := <-reg.epochChan: if !ok { return }