chainntnfs/btcd+neutrino: sync epoch cancel

This commit is contained in:
Conner Fromknecht 2017-07-29 20:28:48 -07:00 committed by Olaoluwa Osuntokun
parent a9b1af4c73
commit 9f85eadde1
2 changed files with 40 additions and 43 deletions

View File

@ -257,9 +257,6 @@ out:
delete(b.spendNotifications[msg.op], msg.spendID)
}
// Signal that it's safe yield from Cancel to application.
close(msg.done)
case *epochCancel:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
@ -278,8 +275,6 @@ out:
close(b.blockEpochClients[msg.epochID].epochChan)
delete(b.blockEpochClients, msg.epochID)
// Signal that it's safe yield from Cancel to application.
close(msg.done)
}
case registerMsg := <-b.notificationRegistry:
switch msg := registerMsg.(type) {
@ -625,8 +620,6 @@ type spendCancel struct {
// spendID the ID of the notification to cancel.
spendID uint64
done chan struct{}
}
// RegisterSpendNtfn registers an intent to be notified once the target
@ -685,17 +678,21 @@ func (b *BtcdNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
done: make(chan struct{}),
}
// Submit spend cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-b.quit:
// Cancellation is being handled, drain the spend chan until it is
// closed before yielding to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
if !ok {
return
}
case <-b.quit:
}
}
case <-b.quit:
}
@ -755,8 +752,6 @@ type blockEpochRegistration struct {
// cancel an outstanding epoch notification that has yet to be dispatched.
type epochCancel struct {
epochID uint64
done chan struct{}
}
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the
@ -779,17 +774,21 @@ func (b *BtcdNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent, er
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
done: make(chan struct{}),
}
// Submit epoch cancellation to notification dispatcher.
select {
case b.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-b.quit:
// Cancellation is being handled, drain the epoch channel until it is
// closed before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
if !ok {
return
}
case <-b.quit:
}
}
case <-b.quit:
}

View File

@ -268,9 +268,6 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
delete(n.spendNotifications[msg.op], msg.spendID)
}
// Signal that it's safe yield from Cancel to application.
close(msg.done)
case *epochCancel:
chainntnfs.Log.Infof("Cancelling epoch "+
"notification, epoch_id=%v", msg.epochID)
@ -288,9 +285,6 @@ func (n *NeutrinoNotifier) notificationDispatcher() {
// cancelled.
close(n.blockEpochClients[msg.epochID].epochChan)
delete(n.blockEpochClients, msg.epochID)
// Signal that it's safe yield from Cancel to application.
close(msg.done)
}
case registerMsg := <-n.notificationRegistry:
@ -687,8 +681,6 @@ type spendCancel struct {
// spendID the ID of the notification to cancel.
spendID uint64
done chan struct{}
}
// RegisterSpendNtfn registers an intent to be notified once the target
@ -716,17 +708,21 @@ func (n *NeutrinoNotifier) RegisterSpendNtfn(outpoint *wire.OutPoint,
cancel := &spendCancel{
op: *outpoint,
spendID: ntfn.spendID,
done: make(chan struct{}),
}
// Submit spend cancellation to notification dispatcher.
select {
case n.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-n.quit:
// Cancellation is being handled, drain the spend chan until it is
// closed before yielding to the caller.
for {
select {
case _, ok := <-ntfn.spendChan:
if !ok {
return
}
case <-n.quit:
}
}
case <-n.quit:
}
@ -860,8 +856,6 @@ type blockEpochRegistration struct {
// to cancel an outstanding epoch notification that has yet to be dispatched.
type epochCancel struct {
epochID uint64
done chan struct{}
}
// RegisterBlockEpochNtfn returns a BlockEpochEvent which subscribes the caller
@ -883,17 +877,21 @@ func (n *NeutrinoNotifier) RegisterBlockEpochNtfn() (*chainntnfs.BlockEpochEvent
Cancel: func() {
cancel := &epochCancel{
epochID: registration.epochID,
done: make(chan struct{}),
}
// Submit epoch cancellation to notification dispatcher.
select {
case n.notificationCancels <- cancel:
// Cancellation is being handled, wait for close before yielding to
// caller.
select {
case <-cancel.done:
case <-n.quit:
// Cancellation is being handled, drain the epoch channel until it is
// closed before yielding to caller.
for {
select {
case _, ok := <-registration.epochChan:
if !ok {
return
}
case <-n.quit:
}
}
case <-n.quit:
}