lnd: use channel barriers to synchronize on-chain events and a peer's readHandler

This commit is contained in:
Olaoluwa Osuntokun 2016-07-12 17:38:09 -07:00
parent 09f6ecef1f
commit 9b29fa3a52
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 30 additions and 8 deletions

View File

@ -406,8 +406,6 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
return return
} }
// TODO(roasbeef): create new chan barrier
// Now that we have their contribution, we can extract, then send over // Now that we have their contribution, we can extract, then send over
// both the funding out point and our signature for their version of // both the funding out point and our signature for their version of
// the commitment transaction to the remote peer. // the commitment transaction to the remote peer.
@ -419,6 +417,10 @@ func (f *fundingManager) handleFundingResponse(fmsg *fundingResponseMsg) {
return return
} }
// Register a new barrier for this channel to properly synchronize with
// the peer's readHandler once the channel is open.
fmsg.peer.barrierInits <- *outPoint
fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)", fndgLog.Infof("Generated ChannelPoint(%v) for pendingID(%v)",
outPoint, msg.ChannelID) outPoint, msg.ChannelID)
@ -477,7 +479,10 @@ func (f *fundingManager) handleFundingComplete(fmsg *fundingCompleteMsg) {
return return
} }
// TODO(roasbeef): create new chan barrier // Register a new barrier for this channel to properly synchronize with
// the peer's readHandler once the channel is open.
fmsg.peer.barrierInits <- *fundingOut
fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)", fndgLog.Infof("sending signComplete for pendingID(%v) over ChannelPoint(%v)",
fmsg.msg.ChannelID, fundingOut) fmsg.msg.ChannelID, fundingOut)

27
peer.go
View File

@ -107,13 +107,12 @@ type peer struct {
// will be signalled once the channel is fully open. This barrier acts // will be signalled once the channel is fully open. This barrier acts
// as a synchronization point for any incoming/outgoing HTLCs before // as a synchronization point for any incoming/outgoing HTLCs before
// the channel has been fully opened. // the channel has been fully opened.
// TODO(roasbeef): barrier to sync chan open and handling of first htlc barrierMtx sync.RWMutex
// message.
newChanBarriers map[wire.OutPoint]chan struct{} newChanBarriers map[wire.OutPoint]chan struct{}
barrierInits chan wire.OutPoint
// newChannels is used by the fundingManager to send fully opened // newChannels is used by the fundingManager to send fully opened
// channels to the source peer which handled the funding workflow. // channels to the source peer which handled the funding workflow.
// TODO(roasbeef): barrier to block until chan open before update
newChannels chan *lnwallet.LightningChannel newChannels chan *lnwallet.LightningChannel
// localCloseChanReqs is a channel in which any local requests to // localCloseChanReqs is a channel in which any local requests to
@ -161,6 +160,7 @@ func newPeer(conn net.Conn, server *server, net wire.BitcoinNet, inbound bool) (
sendQueue: make(chan outgoinMsg, 1), sendQueue: make(chan outgoinMsg, 1),
outgoingQueue: make(chan outgoinMsg, outgoingQueueLen), outgoingQueue: make(chan outgoinMsg, outgoingQueueLen),
barrierInits: make(chan wire.OutPoint),
newChanBarriers: make(map[wire.OutPoint]chan struct{}), newChanBarriers: make(map[wire.OutPoint]chan struct{}),
activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel), activeChannels: make(map[wire.OutPoint]*lnwallet.LightningChannel),
htlcManagers: make(map[wire.OutPoint]chan lnwire.Message), htlcManagers: make(map[wire.OutPoint]chan lnwire.Message),
@ -473,11 +473,16 @@ out:
snapshots = append(snapshots, snapshot) snapshots = append(snapshots, snapshot)
} }
req.resp <- snapshots req.resp <- snapshots
case pendingChanPoint := <-p.barrierInits:
p.barrierMtx.Lock()
peerLog.Tracef("Creating chan barrier for "+
"ChannelPoint(%v)", pendingChanPoint)
p.newChanBarriers[pendingChanPoint] = make(chan struct{})
p.barrierMtx.Unlock()
case newChan := <-p.newChannels: case newChan := <-p.newChannels:
chanPoint := *newChan.ChannelPoint() chanPoint := *newChan.ChannelPoint()
p.activeChannels[chanPoint] = newChan p.activeChannels[chanPoint] = newChan
// TODO(roasbeef): signal channel barrier
peerLog.Infof("New channel active ChannelPoint(%v) "+ peerLog.Infof("New channel active ChannelPoint(%v) "+
"with peerId(%v)", chanPoint, p.id) "with peerId(%v)", chanPoint, p.id)
@ -485,12 +490,24 @@ out:
// Switch of a new active link. // Switch of a new active link.
chanSnapShot := newChan.StateSnapshot() chanSnapShot := newChan.StateSnapshot()
downstreamLink := make(chan lnwire.Message) downstreamLink := make(chan lnwire.Message)
p.server.htlcSwitch.RegisterLink(p, chanSnapShot, downstreamLink) plexChan := p.server.htlcSwitch.RegisterLink(p,
chanSnapShot, downstreamLink)
// With the channel registered to the HtlcSwitch spawn
// a goroutine to handle commitment updates for this
// new channel.
upstreamLink := make(chan lnwire.Message) upstreamLink := make(chan lnwire.Message)
p.htlcManagers[chanPoint] = upstreamLink p.htlcManagers[chanPoint] = upstreamLink
p.wg.Add(1) p.wg.Add(1)
go p.htlcManager(newChan, downstreamLink, upstreamLink) go p.htlcManager(newChan, downstreamLink, upstreamLink)
// Close the active channel barrier signalling the
// readHandler that commitment related modifications to
// this channel can now proceed.
p.barrierMtx.Lock()
peerLog.Tracef("Closing chan barrier for ChannelPoint(%v)", chanPoint)
close(p.newChanBarriers[chanPoint])
delete(p.newChanBarriers, chanPoint)
p.barrierMtx.Unlock()
case req := <-p.localCloseChanReqs: case req := <-p.localCloseChanReqs:
p.handleLocalClose(req) p.handleLocalClose(req)
case req := <-p.remoteCloseChanReqs: case req := <-p.remoteCloseChanReqs: