From 9c18c3d9a41ee9b95575a0ee30a771557d6add3c Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Fri, 9 Feb 2018 16:13:21 -0800 Subject: [PATCH] chainntnfs: ensure all block epoch notifications are sent *in order* MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit In this commit, we fix a lingering bug related to the way that we deliver block epoch notifications to end users. Before this commit, we would launch a new goroutine for *each block*. This was done in order to ensure that the notification dispatch wouldn’t block the main goroutine that was dispatching the notifications. This method archived the goal, but had a nasty side effect that the goroutines could be re-ordered during scheduling, meaning that in the case of fast successive blocks, then notifications would be delivered out of order. Receiving out of order notifications is either disallowed, or can cause sub-systems that rely on these notifications to get into weird states. In order to fix this issue, we’ll no longer launch a new goroutine to deliver each notification to an awaiting client. Instead, each client will now gain a concurrent in-order queue for notification delivery. Due to the internal design of chainntnfs.ConcurrentQueue, the caller should never block, yet the receivers will receive notifications in order. This change solves the re-ordering issue and also minimizes the number of goroutines that we’ll create in order to deliver block epoch notifications. --- chainntnfs/bitcoindnotify/bitcoind.go | 82 +++++++++++++++++-------- chainntnfs/btcdnotify/btcd.go | 88 +++++++++++++++++++-------- chainntnfs/neutrinonotify/neutrino.go | 59 ++++++++++++++++-- 3 files changed, 172 insertions(+), 57 deletions(-) diff --git a/chainntnfs/bitcoindnotify/bitcoind.go b/chainntnfs/bitcoindnotify/bitcoind.go index 5eb3ff98..9fd15b03 100644 --- a/chainntnfs/bitcoindnotify/bitcoind.go +++ b/chainntnfs/bitcoindnotify/bitcoind.go @@ -174,6 +174,9 @@ func (b *BitcoindNotifier) Stop() error { } } for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + close(epochClient.epochChan) } b.txConfNotifier.TearDown() @@ -213,7 +216,13 @@ out: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - // First, close the cancel channel for this + // First, we'll lookup the original + // registration in order to stop the active + // queue goroutine. + reg := b.blockEpochClients[msg.epochID] + reg.epochQueue.Stop() + + // Next, close the cancel channel for this // specific client, and wait for the client to // exit. close(b.blockEpochClients[msg.epochID].cancelChan) @@ -441,27 +450,14 @@ func (b *BitcoindNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash. } for _, epochClient := range b.blockEpochClients { - b.wg.Add(1) - epochClient.wg.Add(1) - go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}, - clientWg *sync.WaitGroup) { + select { - // TODO(roasbeef): move to goroutine per client, use sync queue + case epochClient.epochQueue.ChanIn() <- epoch: - defer clientWg.Done() - defer b.wg.Done() + case <-epochClient.cancelChan: - select { - case ntfnChan <- epoch: - - case <-cancelChan: - return - - case <-b.quit: - return - } - - }(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg) + case <-b.quit: + } } } @@ -628,6 +624,8 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + epochQueue *chainntnfs.ConcurrentQueue + cancelChan chan struct{} wg sync.WaitGroup @@ -643,22 +641,58 @@ type epochCancel struct { // caller to receive notifications, of each new block connected to the main // chain. func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { - registration := &blockEpochRegistration{ + reg := &blockEpochRegistration{ + epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), } + reg.epochQueue.Start() + + // Before we send the request to the main goroutine, we'll launch a new + // goroutine to proxy items added to our queue to the client itself. + // This ensures that all notifications are received *in order*. + reg.wg.Add(1) + go func() { + defer reg.wg.Done() + + for { + select { + case ntfn := <-reg.epochQueue.ChanOut(): + blockNtfn := ntfn.(*chainntnfs.BlockEpoch) + select { + case reg.epochChan <- blockNtfn: + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + } + }() select { case <-b.quit: + // As we're exiting before the registration could be sent, + // we'll stop the queue now ourselves. + reg.epochQueue.Stop() + return nil, errors.New("chainntnfs: system interrupt while " + "attempting to register for block epoch notification.") - case b.notificationRegistry <- registration: + case b.notificationRegistry <- reg: return &chainntnfs.BlockEpochEvent{ - Epochs: registration.epochChan, + Epochs: reg.epochChan, Cancel: func() { cancel := &epochCancel{ - epochID: registration.epochID, + epochID: reg.epochID, } // Submit epoch cancellation to notification dispatcher. @@ -668,7 +702,7 @@ func (b *BitcoindNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent // closed before yielding to caller. for { select { - case _, ok := <-registration.epochChan: + case _, ok := <-reg.epochChan: if !ok { return } diff --git a/chainntnfs/btcdnotify/btcd.go b/chainntnfs/btcdnotify/btcd.go index a5ff7ec5..2f77e308 100644 --- a/chainntnfs/btcdnotify/btcd.go +++ b/chainntnfs/btcdnotify/btcd.go @@ -184,6 +184,9 @@ func (b *BtcdNotifier) Stop() error { } } for _, epochClient := range b.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + close(epochClient.epochChan) } b.txConfNotifier.TearDown() @@ -247,7 +250,13 @@ out: chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - // First, close the cancel channel for this + // First, we'll lookup the original + // registration in order to stop the active + // queue goroutine. + reg := b.blockEpochClients[msg.epochID] + reg.epochQueue.Stop() + + // Next, close the cancel channel for this // specific client, and wait for the client to // exit. close(b.blockEpochClients[msg.epochID].cancelChan) @@ -260,7 +269,6 @@ out: // cancelled. close(b.blockEpochClients[msg.epochID].epochChan) delete(b.blockEpochClients, msg.epochID) - } case registerMsg := <-b.notificationRegistry: switch msg := registerMsg.(type) { @@ -462,27 +470,14 @@ func (b *BtcdNotifier) notifyBlockEpochs(newHeight int32, newSha *chainhash.Hash } for _, epochClient := range b.blockEpochClients { - b.wg.Add(1) - epochClient.wg.Add(1) - go func(ntfnChan chan *chainntnfs.BlockEpoch, cancelChan chan struct{}, - clientWg *sync.WaitGroup) { + select { - // TODO(roasbeef): move to goroutine per client, use sync queue + case epochClient.epochQueue.ChanIn() <- epoch: - defer clientWg.Done() - defer b.wg.Done() + case <-epochClient.cancelChan: - select { - case ntfnChan <- epoch: - - case <-cancelChan: - return - - case <-b.quit: - return - } - - }(epochClient.epochChan, epochClient.cancelChan, &epochClient.wg) + case <-b.quit: + } } } @@ -631,6 +626,8 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + epochQueue *chainntnfs.ConcurrentQueue + cancelChan chan struct{} wg sync.WaitGroup @@ -646,32 +643,69 @@ type epochCancel struct { // caller to receive notifications, of each new block connected to the main // chain. func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { - registration := &blockEpochRegistration{ + reg := &blockEpochRegistration{ + epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&b.epochClientCounter, 1), } + reg.epochQueue.Start() + + // Before we send the request to the main goroutine, we'll launch a new + // goroutine to proxy items added to our queue to the client itself. + // This ensures that all notifications are received *in order*. + reg.wg.Add(1) + go func() { + defer reg.wg.Done() + + for { + select { + case ntfn := <-reg.epochQueue.ChanOut(): + blockNtfn := ntfn.(*chainntnfs.BlockEpoch) + select { + case reg.epochChan <- blockNtfn: + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + + case <-reg.cancelChan: + return + + case <-b.quit: + return + } + } + }() select { case <-b.quit: + // As we're exiting before the registration could be sent, + // we'll stop the queue now ourselves. + reg.epochQueue.Stop() + return nil, errors.New("chainntnfs: system interrupt while " + "attempting to register for block epoch notification.") - case b.notificationRegistry <- registration: + case b.notificationRegistry <- reg: return &chainntnfs.BlockEpochEvent{ - Epochs: registration.epochChan, + Epochs: reg.epochChan, Cancel: func() { cancel := &epochCancel{ - epochID: registration.epochID, + epochID: reg.epochID, } // Submit epoch cancellation to notification dispatcher. select { case b.notificationCancels <- cancel: - // Cancellation is being handled, drain the epoch channel until it is - // closed before yielding to caller. + // Cancellation is being handled, drain + // the epoch channel until it is closed + // before yielding to caller. for { select { - case _, ok := <-registration.epochChan: + case _, ok := <-reg.epochChan: if !ok { return } diff --git a/chainntnfs/neutrinonotify/neutrino.go b/chainntnfs/neutrinonotify/neutrino.go index a75bde2e..8324ab1d 100644 --- a/chainntnfs/neutrinonotify/neutrino.go +++ b/chainntnfs/neutrinonotify/neutrino.go @@ -181,6 +181,9 @@ func (n *NeutrinoNotifier) Stop() error { } } for _, epochClient := range n.blockEpochClients { + close(epochClient.cancelChan) + epochClient.wg.Wait() + close(epochClient.epochChan) } n.txConfNotifier.TearDown() @@ -257,7 +260,13 @@ func (n *NeutrinoNotifier) notificationDispatcher() { chainntnfs.Log.Infof("Cancelling epoch "+ "notification, epoch_id=%v", msg.epochID) - // First, close the cancel channel for this + // First, we'll lookup the original + // registration in order to stop the active + // queue goroutine. + reg := n.blockEpochClients[msg.epochID] + reg.epochQueue.Stop() + + // Next, close the cancel channel for this // specific client, and wait for the client to // exit. close(n.blockEpochClients[msg.epochID].cancelChan) @@ -715,6 +724,8 @@ type blockEpochRegistration struct { epochChan chan *chainntnfs.BlockEpoch + epochQueue *chainntnfs.ConcurrentQueue + cancelChan chan struct{} wg sync.WaitGroup @@ -729,22 +740,58 @@ type epochCancel struct { // RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller // to receive notifications, of each new block connected to the main chain. func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, error) { - registration := &blockEpochRegistration{ + reg := &blockEpochRegistration{ + epochQueue: chainntnfs.NewConcurrentQueue(20), epochChan: make(chan *chainntnfs.BlockEpoch, 20), cancelChan: make(chan struct{}), epochID: atomic.AddUint64(&n.epochClientCounter, 1), } + reg.epochQueue.Start() + + // Before we send the request to the main goroutine, we'll launch a new + // goroutine to proxy items added to our queue to the client itself. + // This ensures that all notifications are received *in order*. + reg.wg.Add(1) + go func() { + defer reg.wg.Done() + + for { + select { + case ntfn := <-reg.epochQueue.ChanOut(): + blockNtfn := ntfn.(*chainntnfs.BlockEpoch) + select { + case reg.epochChan <- blockNtfn: + + case <-reg.cancelChan: + return + + case <-n.quit: + return + } + + case <-reg.cancelChan: + return + + case <-n.quit: + return + } + } + }() select { case <-n.quit: + // As we're exiting before the registration could be sent, + // we'll stop the queue now ourselves. + reg.epochQueue.Stop() + return nil, errors.New("chainntnfs: system interrupt while " + "attempting to register for block epoch notification.") - case n.notificationRegistry <- registration: + case n.notificationRegistry <- reg: return &chainntnfs.BlockEpochEvent{ - Epochs: registration.epochChan, + Epochs: reg.epochChan, Cancel: func() { cancel := &epochCancel{ - epochID: registration.epochID, + epochID: reg.epochID, } // Submit epoch cancellation to notification dispatcher. @@ -754,7 +801,7 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent // closed before yielding to caller. for { select { - case _, ok := <-registration.epochChan: + case _, ok := <-reg.epochChan: if !ok { return }