chainntfns: eliminate dead lock possibility when dispatching notifications

This commit fixes a possible dead lock when dispatching notifications
caused by the circular communication between the notificationDisptcher
thread and the main notification thread within the btcrpcclient.

Rather than potentially blocking for eternity on a blocking send,
notifications are now instantly handled by appending the notification
on an unbounded queue then launching a goroutine to signal the
dispatcher thread that a new item is available within the queue.
This commit is contained in:
Olaoluwa Osuntokun 2016-09-22 19:12:12 -07:00
parent 7d0e1576ec
commit 913504581d
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
1 changed files with 82 additions and 23 deletions

View File

@ -20,6 +20,22 @@ const (
notifierType = "btcd" notifierType = "btcd"
) )
// chainUpdate encapsulates an update to the current main chain. This struct is
// used as an element within an unbounded queue in order to avoid blocking the
// main rpc dispatch rule.
type chainUpdate struct {
blockHash *wire.ShaHash
blockHeight int32
}
// txUpdate encapsulates a transaction related notification sent from btcd to
// the registered RPC client. This struct is used as an element within an
// unbounded queue in order to avoid blocking the main rpc dispatch rule.
type txUpdate struct {
tx *btcutil.Tx
details *btcjson.BlockDetails
}
// BtcdNotifier implements the ChainNotifier interface using btcd's websockets // BtcdNotifier implements the ChainNotifier interface using btcd's websockets
// notifications. Multiple concurrent clients are supported. All notifications // notifications. Multiple concurrent clients are supported. All notifications
// are achieved via non-blocking sends on client channels. // are achieved via non-blocking sends on client channels.
@ -40,9 +56,15 @@ type BtcdNotifier struct {
blockEpochClients []chan *chainntnfs.BlockEpoch blockEpochClients []chan *chainntnfs.BlockEpoch
connectedBlockHashes chan *blockNtfn
disconnectedBlockHashes chan *blockNtfn disconnectedBlockHashes chan *blockNtfn
relevantTxs chan *btcutil.Tx
chainUpdates []*chainUpdate
chainUpdateSignal chan struct{}
chainUpdateMtx sync.Mutex
txUpdates []*txUpdate
txUpdateSignal chan struct{}
txUpdateMtx sync.Mutex
wg sync.WaitGroup wg sync.WaitGroup
quit chan struct{} quit chan struct{}
@ -62,9 +84,10 @@ func New(config *btcrpcclient.ConnConfig) (*BtcdNotifier, error) {
confNotifications: make(map[wire.ShaHash][]*confirmationsNotification), confNotifications: make(map[wire.ShaHash][]*confirmationsNotification),
confHeap: newConfirmationHeap(), confHeap: newConfirmationHeap(),
connectedBlockHashes: make(chan *blockNtfn, 20),
disconnectedBlockHashes: make(chan *blockNtfn, 20), disconnectedBlockHashes: make(chan *blockNtfn, 20),
relevantTxs: make(chan *btcutil.Tx, 100),
chainUpdateSignal: make(chan struct{}),
txUpdateSignal: make(chan struct{}),
quit: make(chan struct{}), quit: make(chan struct{}),
} }
@ -148,11 +171,21 @@ type blockNtfn struct {
} }
// onBlockConnected implements on OnBlockConnected callback for btcrpcclient. // onBlockConnected implements on OnBlockConnected callback for btcrpcclient.
// Ingesting a block updates the wallet's internal utxo state based on the
// outputs created and destroyed within each block.
func (b *BtcdNotifier) onBlockConnected(hash *wire.ShaHash, height int32, t time.Time) { func (b *BtcdNotifier) onBlockConnected(hash *wire.ShaHash, height int32, t time.Time) {
select { // Append this new chain update to the end of the queue of new chain
case b.connectedBlockHashes <- &blockNtfn{hash, height}: // updates.
case <-b.quit: b.chainUpdateMtx.Lock()
} b.chainUpdates = append(b.chainUpdates, &chainUpdate{hash, height})
b.chainUpdateMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// block update is available. We do this in a new goroutine in order to
// avoid blocking the main loop of the rpc client.
go func() {
b.chainUpdateSignal <- struct{}{}
}()
} }
// onBlockDisconnected implements on OnBlockDisconnected callback for btcrpcclient. // onBlockDisconnected implements on OnBlockDisconnected callback for btcrpcclient.
@ -160,11 +193,19 @@ func (b *BtcdNotifier) onBlockDisconnected(hash *wire.ShaHash, height int32, t t
} }
// onRedeemingTx implements on OnRedeemingTx callback for btcrpcclient. // onRedeemingTx implements on OnRedeemingTx callback for btcrpcclient.
func (b *BtcdNotifier) onRedeemingTx(transaction *btcutil.Tx, details *btcjson.BlockDetails) { func (b *BtcdNotifier) onRedeemingTx(tx *btcutil.Tx, details *btcjson.BlockDetails) {
select { // Append this new transaction update to the end of the queue of new chain
case b.relevantTxs <- transaction: // updates.
case <-b.quit: b.txUpdateMtx.Lock()
} b.txUpdates = append(b.txUpdates, &txUpdate{tx, details})
b.txUpdateMtx.Unlock()
// Launch a goroutine to signal the notification dispatcher that a new
// transaction update is available. We do this in a new goroutine in
// order to avoid blocking the main loop of the rpc client.
go func() {
b.txUpdateSignal <- struct{}{}
}()
} }
// notificationDispatcher is the primary goroutine which handles client // notificationDispatcher is the primary goroutine which handles client
@ -197,20 +238,28 @@ out:
// * notify of negative confirmations // * notify of negative confirmations
chainntnfs.Log.Warnf("Block disconnected from main "+ chainntnfs.Log.Warnf("Block disconnected from main "+
"chain: %v", staleBlockHash) "chain: %v", staleBlockHash)
case connectedBlock := <-b.connectedBlockHashes: case <-b.chainUpdateSignal:
newBlock, err := b.chainConn.GetBlock(connectedBlock.sha) // A new update is available, so pop the new chain
// update from the front of the update queue.
b.chainUpdateMtx.Lock()
update := b.chainUpdates[0]
b.chainUpdates[0] = nil // Set to nil to prevent GC leak.
b.chainUpdates = b.chainUpdates[1:]
b.chainUpdateMtx.Unlock()
newBlock, err := b.chainConn.GetBlock(update.blockHash)
if err != nil { if err != nil {
chainntnfs.Log.Errorf("Unable to get block: %v", err) chainntnfs.Log.Errorf("Unable to get block: %v", err)
continue continue
} }
chainntnfs.Log.Infof("New block: height=%v, sha=%v", chainntnfs.Log.Infof("New block: height=%v, sha=%v",
connectedBlock.height, connectedBlock.sha) update.blockHeight, update.blockHash)
go b.notifyBlockEpochs(connectedBlock.height, go b.notifyBlockEpochs(update.blockHeight,
connectedBlock.sha) update.blockHash)
newHeight := connectedBlock.height newHeight := update.blockHeight
for _, tx := range newBlock.Transactions() { for _, tx := range newBlock.Transactions() {
// Check if the inclusion of this transaction // Check if the inclusion of this transaction
// within a block by itself triggers a block // within a block by itself triggers a block
@ -226,10 +275,20 @@ out:
// chain. Send out any N confirmation notifications // chain. Send out any N confirmation notifications
// which may have been triggered by this new block. // which may have been triggered by this new block.
b.notifyConfs(newHeight) b.notifyConfs(newHeight)
case newSpend := <-b.relevantTxs: case <-b.txUpdateSignal:
// A new update is available, so pop the new chain
// update from the front of the update queue.
b.txUpdateMtx.Lock()
newSpend := b.txUpdates[0]
b.txUpdates[0] = nil // Set to nil to prevent GC leak.
b.txUpdates = b.txUpdates[1:]
b.txUpdateMtx.Unlock()
spendingTx := newSpend.tx
// First, check if this transaction spends an output // First, check if this transaction spends an output
// that has an existing spend notification for it. // that has an existing spend notification for it.
for i, txIn := range newSpend.MsgTx().TxIn { for i, txIn := range spendingTx.MsgTx().TxIn {
prevOut := txIn.PreviousOutPoint prevOut := txIn.PreviousOutPoint
// If this transaction indeed does spend an // If this transaction indeed does spend an
@ -238,12 +297,12 @@ out:
// sending off the details to the notification // sending off the details to the notification
// subscriber. // subscriber.
if ntfn, ok := b.spendNotifications[prevOut]; ok { if ntfn, ok := b.spendNotifications[prevOut]; ok {
spenderSha := newSpend.Sha() spenderSha := newSpend.tx.Sha()
spendDetails := &chainntnfs.SpendDetail{ spendDetails := &chainntnfs.SpendDetail{
SpentOutPoint: ntfn.targetOutpoint, SpentOutPoint: ntfn.targetOutpoint,
SpenderTxHash: spenderSha, SpenderTxHash: spenderSha,
// TODO(roasbeef): copy tx? // TODO(roasbeef): copy tx?
SpendingTx: newSpend.MsgTx(), SpendingTx: spendingTx.MsgTx(),
SpenderInputIndex: uint32(i), SpenderInputIndex: uint32(i),
} }