lnd: implement per-channel state-machine driver within peer

With this commitment, the daemon is now able to properly send+redeem
single-hop HTLC’s with another daemon running on the same network.

The htlcManager gains an additional channel which is reads from
receiving updates from either downstream peers, or the rpc server. An
htlcManager is spawned for each active channel with the remote peer. As
a result, the readHandler must now de-multiplex any messages which
update a known channel to the proper htlcManager.

Batching HTLC add updates with a trickle timer has not yet been
implemented, but will be in the near future along with several other
optimizations.
This commit is contained in:
Olaoluwa Osuntokun 2016-07-12 17:45:29 -07:00
parent af7cfd6c8b
commit 88949e181a
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
1 changed files with 254 additions and 17 deletions

271
peer.go
View File

@ -222,12 +222,14 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
// necessary to properly route multi-hop payments, and forward
// new payments triggered by RPC clients.
downstreamLink := make(chan lnwire.Message)
p.server.htlcSwitch.RegisterLink(p, dbChan.Snapshot(), downstreamLink)
plexChan := p.server.htlcSwitch.RegisterLink(p,
dbChan.Snapshot(), downstreamLink)
// TODO(roasbeef): buffer?
upstreamLink := make(chan lnwire.Message)
p.htlcManagers[chanPoint] = upstreamLink
p.wg.Add(1)
go p.htlcManager(lnChan, downstreamLink, upstreamLink)
go p.htlcManager(lnChan, plexChan, downstreamLink, upstreamLink)
}
return nil
@ -301,9 +303,6 @@ func (p *peer) readNextMessage() (lnwire.Message, []byte, error) {
//
// NOTE: This method MUST be run as a goroutine.
func (p *peer) readHandler() {
// TODO(roasbeef): set timeout for initial channel request or version
// exchange.
out:
for atomic.LoadInt32(&p.disconnect) == 0 {
nextMsg, _, err := p.readNextMessage()
@ -312,6 +311,9 @@ out:
break out
}
var isChanUpate bool
var targetChan *wire.OutPoint
switch msg := nextMsg.(type) {
// TODO(roasbeef): consolidate into predicate (single vs dual)
case *lnwire.SingleFundingRequest:
@ -326,6 +328,51 @@ out:
p.server.fundingMgr.processFundingOpenProof(msg, p)
case *lnwire.CloseRequest:
p.remoteCloseChanReqs <- msg
// TODO(roasbeef): interface for htlc update msgs
// * .(CommitmentUpdater)
case *lnwire.HTLCAddRequest:
isChanUpate = true
targetChan = msg.ChannelPoint
case *lnwire.HTLCSettleRequest:
isChanUpate = true
targetChan = msg.ChannelPoint
case *lnwire.CommitRevocation:
isChanUpate = true
targetChan = msg.ChannelPoint
case *lnwire.CommitSignature:
isChanUpate = true
targetChan = msg.ChannelPoint
}
if isChanUpate {
// We might be receiving an update to a newly funded
// channel in which we were the responder. Therefore
// we need to possibly block until the new channel has
// propagated internally through the system.
p.barrierMtx.RLock()
barrier, ok := p.newChanBarriers[*targetChan]
p.barrierMtx.RUnlock()
if ok {
peerLog.Tracef("waiting for chan barrier "+
"signal for ChannelPoint(%v)", targetChan)
select {
case <-barrier:
case <-p.quit: // TODO(roasbeef): add timer?
break out
}
peerLog.Tracef("barrier for ChannelPoint(%v) "+
"closed", targetChan)
}
// Dispatch the commitment update message to the proper
// active goroutine dedicated to this channel.
targetChan, ok := p.htlcManagers[*targetChan]
if !ok {
peerLog.Errorf("recv'd update for unknown channel %v",
targetChan)
continue
}
targetChan <- nextMsg
}
}
@ -499,7 +546,8 @@ out:
upstreamLink := make(chan lnwire.Message)
p.htlcManagers[chanPoint] = upstreamLink
p.wg.Add(1)
go p.htlcManager(newChan, downstreamLink, upstreamLink)
go p.htlcManager(newChan, plexChan, downstreamLink, upstreamLink)
// Close the active channel barrier signalling the
// readHandler that commitment related modifications to
// this channel can now proceed.
@ -656,22 +704,72 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) {
}
}
// htlcManager...
// * communicates with the htlc switch over several channels
// * in handler sends to this goroutine after getting final revocation
// * has timeouts etc, to send back on queue handler in case of timeout
// TODO(roabseef): split downstream link into two chans (send vs recv)
// commitmentState is the volatile+persistent state of an active channel's
// commitment update state-machine. This struct is used by htlcManager's to
// save meta-state required for proper functioning.
type commitmentState struct {
pendingLogLen uint32
htlcsToSettle [][32]byte
sigPending bool
channel *lnwallet.LightningChannel
chanPoint *wire.OutPoint
}
// htlcManager is the primary goroutine which drives a channel's commitment
// update state-machine in response to messages received via several channels.
// The htlcManager reads messages from the upstream (remote) peer, and also
// from several possible downstream channels managed by the htlcSwitch. In the
// event that an htlc needs to be forwarded, then send-only htlcPlex chan is
// used which sends htlc packets to the switch for forwarding. Additionally,
// the htlcManager handles acting upon all timeouts for any active HTLC's,
// manages the channel's revocation window, and also the htlc trickle
// queue+timer for this active channels.
func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
downstreamLink chan lnwire.Message, upstreamLink chan lnwire.Message) {
htlcPlex chan<- *htlcPacket, downstreamLink <-chan lnwire.Message,
upstreamLink <-chan lnwire.Message) {
peerLog.Tracef("htlc manager for channel %v started",
channel.ChannelPoint())
chanStats := channel.StateSnapshot()
peerLog.Tracef("HTLC manager for ChannelPoint(%v) started, "+
"our_balance=%v, their_balance=%v, chain_height=%v",
channel.ChannelPoint(), chanStats.LocalBalance,
chanStats.RemoteBalance, chanStats.NumUpdates)
// A new session for this active channel has just started, therefore we
// need to send our initial revocation window to the remote peer.
for i := 0; i < lnwallet.InitialRevocationWindow; i++ {
rev, err := channel.ExtendRevocationWindow()
if err != nil {
peerLog.Errorf("unable to expand revocation window: %v", err)
continue
}
p.queueMsg(rev, nil)
}
state := &commitmentState{
channel: channel,
chanPoint: channel.ChannelPoint(),
}
out:
for {
select {
case htlcPkt := <-downstreamLink:
fmt.Println(htlcPkt)
case msg := <-downstreamLink:
switch htlc := msg.(type) {
case *lnwire.HTLCAddRequest:
// A new payment has been initiated via the
// downstream channel, so we add the new HTLC
// to our local log, then update the commitment
// chains.
channel.AddHTLC(htlc, false)
p.queueMsg(htlc, nil)
// TODO(roasbeef): batch trickle timer + cap
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
}
}
case msg, ok := <-upstreamLink:
// If the upstream message link is closed, this signals
// that the channel itself is being closed, therefore
@ -680,7 +778,120 @@ out:
break out
}
fmt.Println(msg)
switch htlcPkt := msg.(type) {
// TODO(roasbeef): timeouts
case *lnwire.HTLCAddRequest:
// We just received an add request from an
// upstream peer, so we add it to our state
// machine, then add the HTLC to our "settle"
// list in the event that we know the pre-image
channel.AddHTLC(htlcPkt, true)
rHash := htlcPkt.RedemptionHashes[0]
if invoice, found := p.server.invoices.lookupInvoice(rHash); found {
// TODO(roasbeef): check value
// * onion layer strip should also be before invoice lookup
pre := invoice.paymentPreimage
state.htlcsToSettle = append(state.htlcsToSettle, pre)
}
case *lnwire.HTLCSettleRequest:
// TODO(roasbeef): this assumes no "multi-sig"
pre := htlcPkt.RedemptionProofs[0]
if _, err := channel.SettleHTLC(pre, true); err != nil {
peerLog.Errorf("settle for outgoing HTLC rejected: %v", err)
continue
}
case *lnwire.CommitSignature:
// We just received a new update to our local
// commitment chain, validate this new
// commitment, closing the link if invalid.
// TODO(roasbeef): use uint64 for indexes?
logIndex := uint32(htlcPkt.LogIndex)
sig := htlcPkt.CommitSig.Serialize()
if err := channel.ReceiveNewCommitment(sig, logIndex); err != nil {
peerLog.Errorf("unable to accept new commitment: %v", err)
continue
}
// If we didn't initiate this state transition,
// then we'll update the remote commitment
// chain with a new commitment. Otherwise, we
// can reset the pending bit as we received the
// signature we were expecting.
if !state.sigPending {
// TODO(roasbeef): may not always want to *immediatly*
// sign next commitment.
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
continue
}
} else {
state.sigPending = false
}
// Finally, since we just accepted a new state,
// send the remote peer a revocation for our
// prior state.
nextRevocation, err := channel.RevokeCurrentCommitment()
if err != nil {
peerLog.Errorf("unable to revoke current commitment: %v", err)
continue
}
p.queueMsg(nextRevocation, nil)
case *lnwire.CommitRevocation:
// We've received a revocation from the remote
// chain, if valid, this moves the remote chain
// forward, and expands our revocation window.
htlcsToForward, err := channel.ReceiveRevocation(htlcPkt)
if err != nil {
peerLog.Errorf("unable to accept revocation: %v", err)
continue
}
// TODO(roasbeef): send the locked-in HTLC's
// over the plex chan to the switch.
peerLog.Debugf("htlcs ready to forward: %v",
spew.Sdump(htlcsToForward))
// A full state transition has been completed,
// if we don't need to settle any HTLC's, then
// we're done.
if len(state.htlcsToSettle) == 0 {
continue
}
// Otherwise, we have some pending HTLC's which
// we can pull funds from, thereby settling.
peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle))
for _, pre := range state.htlcsToSettle {
// Add each HTLC settle update to the
// channel's state update log, also
// sending the log update to the remote
// party.
logIndex, err := channel.SettleHTLC(pre, false)
if err != nil {
peerLog.Errorf("unable to settle htlc: %v", err)
continue
}
settleMsg := &lnwire.HTLCSettleRequest{
ChannelPoint: state.chanPoint,
HTLCKey: lnwire.HTLCKey(logIndex),
RedemptionProofs: [][32]byte{pre},
}
p.queueMsg(settleMsg, nil)
}
// With all the settle updates added to the
// local and remote HTLC logs, initiate a state
// transition by updating the remote commitment
// chain.
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
continue
}
state.htlcsToSettle = nil
}
case <-p.quit:
break out
}
@ -689,4 +900,30 @@ out:
p.wg.Done()
}
// updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point.
func (p *peer) updateCommitTx(state *commitmentState) error {
sigTheirs, logIndexTheirs, err := state.channel.SignNextCommitment()
if err != nil {
return fmt.Errorf("unable to sign next commitment: %v", err)
}
parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256())
if err != nil {
return fmt.Errorf("unable to parse sig: %v", err)
}
commitSig := &lnwire.CommitSignature{
ChannelPoint: state.chanPoint,
CommitSig: parsedSig,
LogIndex: uint64(logIndexTheirs),
}
p.queueMsg(commitSig, nil)
state.sigPending = true
return nil
}
// TODO(roasbeef): make all start/stop mutexes a CAS