peer: update channel commitment updates to match spec

This commit modifies a peer’s htlcManager goroutine in order to
properly implement the new state machine defined by the specification.
The major change to this new state machine is that we can no longer
have a limited number of unrevoked commitment states. As a result, we
no longer need to track how many outsanding changes we have, and only
need to track if we have a pending change or not. This simplifies the
logic a bit.

Additionally, when receive a new signature we FIRST send an
RevokeAndAck, THEN we if we need to send a signature in response or
not. This is the major change to the state machine from the PoV of the
htlcManager. Previously, the order was flipped.
This commit is contained in:
Olaoluwa Osuntokun 2017-02-20 18:10:05 -08:00
parent caa464f33b
commit 4a48b91e31
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
1 changed files with 168 additions and 133 deletions

299
peer.go
View File

@ -419,8 +419,10 @@ out:
} }
} }
var isChanUpdate bool var (
var targetChan *wire.OutPoint isChanUpdate bool
targetChan wire.OutPoint
)
switch msg := nextMsg.(type) { switch msg := nextMsg.(type) {
case *lnwire.Pong: case *lnwire.Pong:
@ -452,19 +454,19 @@ out:
p.server.fundingMgr.processErrorGeneric(msg, p) p.server.fundingMgr.processErrorGeneric(msg, p)
// TODO(roasbeef): create ChanUpdater interface for the below // TODO(roasbeef): create ChanUpdater interface for the below
case *lnwire.HTLCAddRequest: case *lnwire.UpdateAddHTLC:
isChanUpdate = true isChanUpdate = true
targetChan = msg.ChannelPoint targetChan = msg.ChannelPoint
case *lnwire.HTLCSettleRequest: case *lnwire.UpdateFufillHTLC:
isChanUpdate = true isChanUpdate = true
targetChan = msg.ChannelPoint targetChan = msg.ChannelPoint
case *lnwire.CancelHTLC: case *lnwire.UpdateFailHTLC:
isChanUpdate = true isChanUpdate = true
targetChan = msg.ChannelPoint targetChan = msg.ChannelPoint
case *lnwire.CommitRevocation: case *lnwire.RevokeAndAck:
isChanUpdate = true isChanUpdate = true
targetChan = msg.ChannelPoint targetChan = msg.ChannelPoint
case *lnwire.CommitSignature: case *lnwire.CommitSig:
isChanUpdate = true isChanUpdate = true
targetChan = msg.ChannelPoint targetChan = msg.ChannelPoint
@ -484,7 +486,7 @@ out:
// TODO(roasbeef): replace with atomic load from/into // TODO(roasbeef): replace with atomic load from/into
// map? // map?
p.barrierMtx.RLock() p.barrierMtx.RLock()
barrier, ok := p.newChanBarriers[*targetChan] barrier, ok := p.newChanBarriers[targetChan]
p.barrierMtx.RUnlock() p.barrierMtx.RUnlock()
if ok { if ok {
peerLog.Tracef("waiting for chan barrier "+ peerLog.Tracef("waiting for chan barrier "+
@ -501,7 +503,7 @@ out:
// Dispatch the commitment update message to the proper // Dispatch the commitment update message to the proper
// active goroutine dedicated to this channel. // active goroutine dedicated to this channel.
p.htlcManMtx.Lock() p.htlcManMtx.Lock()
channel, ok := p.htlcManagers[*targetChan] channel, ok := p.htlcManagers[targetChan]
p.htlcManMtx.Unlock() p.htlcManMtx.Unlock()
if !ok { if !ok {
peerLog.Errorf("recv'd update for unknown "+ peerLog.Errorf("recv'd update for unknown "+
@ -525,7 +527,7 @@ out:
// parameters for secp256k1. // parameters for secp256k1.
func (p *peer) logWireMessage(msg lnwire.Message, read bool) { func (p *peer) logWireMessage(msg lnwire.Message, read bool) {
switch m := msg.(type) { switch m := msg.(type) {
case *lnwire.CommitRevocation: case *lnwire.RevokeAndAck:
m.NextRevocationKey.Curve = nil m.NextRevocationKey.Curve = nil
case *lnwire.NodeAnnouncement: case *lnwire.NodeAnnouncement:
m.NodeID.Curve = nil m.NodeID.Curve = nil
@ -831,7 +833,7 @@ func (p *peer) executeCooperativeClose(channel *lnwallet.LightningChannel) (*cha
if err != nil { if err != nil {
return nil, err return nil, err
} }
closeReq := lnwire.NewCloseRequest(chanPoint, closeSig) closeReq := lnwire.NewCloseRequest(*chanPoint, closeSig)
p.queueMsg(closeReq, nil) p.queueMsg(closeReq, nil)
return txid, nil return txid, nil
@ -990,7 +992,7 @@ func (p *peer) handleRemoteClose(req *lnwire.CloseRequest) {
peerLog.Errorf("unable to wipe channel: %v", err) peerLog.Errorf("unable to wipe channel: %v", err)
} }
p.server.breachArbiter.settledContracts <- req.ChannelPoint p.server.breachArbiter.settledContracts <- &req.ChannelPoint
} }
// wipeChannel removes the passed channel from all indexes associated with the // wipeChannel removes the passed channel from all indexes associated with the
@ -1048,8 +1050,8 @@ func wipeChannel(p *peer, channel *lnwallet.LightningChannel) error {
// channel to signal the payment requester once the payment has been fully // channel to signal the payment requester once the payment has been fully
// fufilled. // fufilled.
type pendingPayment struct { type pendingPayment struct {
htlc *lnwire.HTLCAddRequest htlc *lnwire.UpdateAddHTLC
index uint32 index uint64
err chan error err chan error
} }
@ -1060,35 +1062,34 @@ type pendingPayment struct {
type commitmentState struct { type commitmentState struct {
// htlcsToSettle is a list of preimages which allow us to settle one or // htlcsToSettle is a list of preimages which allow us to settle one or
// many of the pending HTLCs we've received from the upstream peer. // many of the pending HTLCs we've received from the upstream peer.
htlcsToSettle map[uint32]*channeldb.Invoice htlcsToSettle map[uint64]*channeldb.Invoice
// htlcsToCancel is a set of HTLCs identified by their log index which // htlcsToCancel is a set of HTLCs identified by their log index which
// are to be cancelled upon the next state transition. // are to be cancelled upon the next state transition.
htlcsToCancel map[uint32]lnwire.CancelReason htlcsToCancel map[uint64]lnwire.FailCode
// cancelReasons stores the reason why a particular HTLC was cancelled. // cancelReasons stores the reason why a particular HTLC was cancelled.
// The index of the HTLC within the log is mapped to the cancellation // The index of the HTLC within the log is mapped to the cancellation
// reason. This value is used to thread the proper error through to the // reason. This value is used to thread the proper error through to the
// htlcSwitch, or subsystem that initiated the HTLC. // htlcSwitch, or subsystem that initiated the HTLC.
cancelReasons map[uint32]lnwire.CancelReason cancelReasons map[uint64]lnwire.FailCode
// TODO(roasbeef): use once trickle+batch logic is in
pendingBatch []*pendingPayment pendingBatch []*pendingPayment
// pendingUpdate is a bool which indicates if we have a pending state
// update outstanding whch has not yet been ACK'd.
pendingUpdate bool
// clearedHTCLs is a map of outgoing HTLCs we've committed to in our // clearedHTCLs is a map of outgoing HTLCs we've committed to in our
// chain which have not yet been settled by the upstream peer. // chain which have not yet been settled by the upstream peer.
clearedHTCLs map[uint32]*pendingPayment clearedHTCLs map[uint64]*pendingPayment
// numUnAcked is a counter tracking the number of unacked changes we've
// sent. A change is acked once we receive a new update to our local
// chain from the remote peer.
numUnAcked uint32
// logCommitTimer is a timer which is sent upon if we go an interval // logCommitTimer is a timer which is sent upon if we go an interval
// without receiving/sending a commitment update. It's role is to // without receiving/sending a commitment update. It's role is to
// ensure both chains converge to identical state in a timely manner. // ensure both chains converge to identical state in a timely manner.
// TODO(roasbeef): timer should be >> then RTT // TODO(roasbeef): timer should be >> then RTT
logCommitTimer <-chan time.Time logCommitTimer *time.Timer
logCommitTick <-chan time.Time
// switchChan is a channel used to send packets to the htlc switch for // switchChan is a channel used to send packets to the htlc switch for
// forwarding. // forwarding.
@ -1105,7 +1106,7 @@ type commitmentState struct {
// the log, and when it's locked into the commitment state of both // the log, and when it's locked into the commitment state of both
// chains. Once locked in, the processed packet is sent to the switch // chains. Once locked in, the processed packet is sent to the switch
// along with the HTLC to forward the packet to the next hop. // along with the HTLC to forward the packet to the next hop.
pendingCircuits map[uint32]*sphinx.ProcessedPacket pendingCircuits map[uint64]*sphinx.ProcessedPacket
channel *lnwallet.LightningChannel channel *lnwallet.LightningChannel
chanPoint *wire.OutPoint chanPoint *wire.OutPoint
@ -1144,13 +1145,14 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
state := &commitmentState{ state := &commitmentState{
channel: channel, channel: channel,
chanPoint: channel.ChannelPoint(), chanPoint: channel.ChannelPoint(),
clearedHTCLs: make(map[uint32]*pendingPayment), clearedHTCLs: make(map[uint64]*pendingPayment),
htlcsToSettle: make(map[uint32]*channeldb.Invoice), htlcsToSettle: make(map[uint64]*channeldb.Invoice),
htlcsToCancel: make(map[uint32]lnwire.CancelReason), htlcsToCancel: make(map[uint64]lnwire.FailCode),
cancelReasons: make(map[uint32]lnwire.CancelReason), cancelReasons: make(map[uint64]lnwire.FailCode),
pendingCircuits: make(map[uint32]*sphinx.ProcessedPacket), pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket),
sphinx: p.server.sphinx, sphinx: p.server.sphinx,
switchChan: htlcPlex, switchChan: htlcPlex,
logCommitTimer: time.NewTimer(300 * time.Millisecond),
} }
// TODO(roasbeef): check to see if able to settle any currently pending // TODO(roasbeef): check to see if able to settle any currently pending
@ -1158,7 +1160,8 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
// * also need signals when new invoices are added by the // * also need signals when new invoices are added by the
// invoiceRegistry // invoiceRegistry
batchTimer := time.Tick(10 * time.Millisecond) batchTimer := time.NewTicker(50 * time.Millisecond)
defer batchTimer.Stop()
out: out:
for { for {
select { select {
@ -1173,6 +1176,7 @@ out:
p.server.breachArbiter.settledContracts <- state.chanPoint p.server.breachArbiter.settledContracts <- state.chanPoint
break out break out
case <-channel.ForceCloseSignal: case <-channel.ForceCloseSignal:
// TODO(roasbeef): path never taken now that server // TODO(roasbeef): path never taken now that server
// force closes's directly? // force closes's directly?
@ -1180,8 +1184,8 @@ out:
"closed, disconnecting from peerID(%x)", "closed, disconnecting from peerID(%x)",
state.chanPoint, p.id) state.chanPoint, p.id)
break out break out
// TODO(roasbeef): prevent leaking ticker?
case <-state.logCommitTimer: case <-state.logCommitTick:
// If we haven't sent or received a new commitment // If we haven't sent or received a new commitment
// update in some time, check to see if we have any // update in some time, check to see if we have any
// pending updates we need to commit. If so, then send // pending updates we need to commit. If so, then send
@ -1192,15 +1196,14 @@ out:
continue continue
} }
if sent, err := p.updateCommitTx(state); err != nil { if err := p.updateCommitTx(state, false); err != nil {
peerLog.Errorf("unable to update "+ peerLog.Errorf("unable to update commitment: %v",
"commitment: %v", err) err)
p.Disconnect() p.Disconnect()
break out break out
} else if sent {
state.numUnAcked += 1
} }
case <-batchTimer:
case <-batchTimer.C:
// If the current batch is empty, then we have no work // If the current batch is empty, then we have no work
// here. // here.
if len(state.pendingBatch) == 0 { if len(state.pendingBatch) == 0 {
@ -1212,18 +1215,16 @@ out:
// If the send was unsuccessful, then abandon the // If the send was unsuccessful, then abandon the
// update, waiting for the revocation window to open // update, waiting for the revocation window to open
// up. // up.
if sent, err := p.updateCommitTx(state); err != nil { if err := p.updateCommitTx(state, false); err != nil {
peerLog.Errorf("unable to update "+ peerLog.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
p.Disconnect() p.Disconnect()
break out break out
} else if !sent {
continue
} }
state.numUnAcked += 1
case pkt := <-downstreamLink: case pkt := <-downstreamLink:
p.handleDownStreamPkt(state, pkt) p.handleDownStreamPkt(state, pkt)
case msg, ok := <-upstreamLink: case msg, ok := <-upstreamLink:
// If the upstream message link is closed, this signals // If the upstream message link is closed, this signals
// that the channel itself is being closed, therefore // that the channel itself is being closed, therefore
@ -1285,12 +1286,12 @@ func (p *peer) sendInitMsg() error {
func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
var isSettle bool var isSettle bool
switch htlc := pkt.msg.(type) { switch htlc := pkt.msg.(type) {
case *lnwire.HTLCAddRequest: case *lnwire.UpdateAddHTLC:
// A new payment has been initiated via the // A new payment has been initiated via the
// downstream channel, so we add the new HTLC // downstream channel, so we add the new HTLC
// to our local log, then update the commitment // to our local log, then update the commitment
// chains. // chains.
htlc.ChannelPoint = state.chanPoint htlc.ChannelPoint = *state.chanPoint
index, err := state.channel.AddHTLC(htlc) index, err := state.channel.AddHTLC(htlc)
if err != nil { if err != nil {
// TODO: possibly perform fallback/retry logic // TODO: possibly perform fallback/retry logic
@ -1304,7 +1305,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// HTLC adding failed we should do the reverse // HTLC adding failed we should do the reverse
// operation. // operation.
htlcSwitch := p.server.htlcSwitch htlcSwitch := p.server.htlcSwitch
htlcSwitch.UpdateLink(htlc.ChannelPoint, pkt.amt) htlcSwitch.UpdateLink(&htlc.ChannelPoint, pkt.amt)
return return
} }
@ -1316,11 +1317,11 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
err: pkt.err, err: pkt.err,
}) })
case *lnwire.HTLCSettleRequest: case *lnwire.UpdateFufillHTLC:
// An HTLC we forward to the switch has just settle somehere // An HTLC we forward to the switch has just settled somewhere
// upstream. Therefore we settle the HTLC within the our local // upstream. Therefore we settle the HTLC within the our local
// state machine. // state machine.
pre := htlc.RedemptionProofs[0] pre := htlc.PaymentPreimage
logIndex, err := state.channel.SettleHTLC(pre) logIndex, err := state.channel.SettleHTLC(pre)
if err != nil { if err != nil {
// TODO(roasbeef): broadcast on-chain // TODO(roasbeef): broadcast on-chain
@ -1332,18 +1333,18 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// With the HTLC settled, we'll need to populate the wire // With the HTLC settled, we'll need to populate the wire
// message to target the specific channel and HTLC to be // message to target the specific channel and HTLC to be
// cancelled. // cancelled.
htlc.ChannelPoint = state.chanPoint htlc.ChannelPoint = *state.chanPoint
htlc.HTLCKey = lnwire.HTLCKey(logIndex) htlc.ID = logIndex
// Then we send the HTLC settle message to the connected peer // Then we send the HTLC settle message to the connected peer
// so we can continue the propagation of the settle message. // so we can continue the propagation of the settle message.
p.queueMsg(htlc, nil) p.queueMsg(htlc, nil)
isSettle = true isSettle = true
case *lnwire.CancelHTLC: case *lnwire.UpdateFailHTLC:
// An HTLC cancellation has been triggered somewhere upstream, // An HTLC cancellation has been triggered somewhere upstream,
// we'll remove then HTLC from our local state machine. // we'll remove then HTLC from our local state machine.
logIndex, err := state.channel.CancelHTLC(pkt.payHash) logIndex, err := state.channel.FailHTLC(pkt.payHash)
if err != nil { if err != nil {
peerLog.Errorf("unable to cancel HTLC: %v", err) peerLog.Errorf("unable to cancel HTLC: %v", err)
return return
@ -1353,8 +1354,8 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// message to target the specific channel and HTLC to be // message to target the specific channel and HTLC to be
// cancelled. The "Reason" field will have already been set // cancelled. The "Reason" field will have already been set
// within the switch. // within the switch.
htlc.ChannelPoint = state.chanPoint htlc.ChannelPoint = *state.chanPoint
htlc.HTLCKey = lnwire.HTLCKey(logIndex) htlc.ID = logIndex
// Finally, we send the HTLC message to the peer which // Finally, we send the HTLC message to the peer which
// initially created the HTLC. // initially created the HTLC.
@ -1362,20 +1363,16 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
isSettle = true isSettle = true
} }
// If this newly added update exceeds the max batch size for adds, or // If this newly added update exceeds the min batch size for adds, or
// this is a settle request, then initiate an update. // this is a settle request, then initiate an update.
// TODO(roasbeef): enforce max HTLCs in flight limit // TODO(roasbeef): enforce max HTLCs in flight limit
if len(state.pendingBatch) >= 10 || isSettle { if len(state.pendingBatch) >= 10 || isSettle {
if sent, err := p.updateCommitTx(state); err != nil { if err := p.updateCommitTx(state, false); err != nil {
peerLog.Errorf("unable to update "+ peerLog.Errorf("unable to update "+
"commitment: %v", err) "commitment: %v", err)
p.Disconnect() p.Disconnect()
return return
} else if !sent {
return
} }
state.numUnAcked += 1
} }
} }
@ -1386,10 +1383,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
switch htlcPkt := msg.(type) { switch htlcPkt := msg.(type) {
// TODO(roasbeef): timeouts // TODO(roasbeef): timeouts
// * fail if can't parse sphinx mix-header // * fail if can't parse sphinx mix-header
case *lnwire.HTLCAddRequest: case *lnwire.UpdateAddHTLC:
// Before adding the new HTLC to the state machine, parse the // Before adding the new HTLC to the state machine, parse the
// onion object in order to obtain the routing information. // onion object in order to obtain the routing information.
blobReader := bytes.NewReader(htlcPkt.OnionBlob) blobReader := bytes.NewReader(htlcPkt.OnionBlob[:])
onionPkt := &sphinx.OnionPacket{} onionPkt := &sphinx.OnionPacket{}
if err := onionPkt.Decode(blobReader); err != nil { if err := onionPkt.Decode(blobReader); err != nil {
peerLog.Errorf("unable to decode onion pkt: %v", err) peerLog.Errorf("unable to decode onion pkt: %v", err)
@ -1415,7 +1412,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// a replay attacks. In the case of a replay, an attacker is // a replay attacks. In the case of a replay, an attacker is
// *forced* to use the same payment hash twice, thereby losing // *forced* to use the same payment hash twice, thereby losing
// their money entirely. // their money entirely.
rHash := htlcPkt.RedemptionHashes[0][:] rHash := htlcPkt.PaymentHash[:]
sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash) sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt, rHash)
if err != nil { if err != nil {
// If we're unable to parse the Sphinx packet, then // If we're unable to parse the Sphinx packet, then
@ -1431,7 +1428,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// attempt to see if we have an invoice locally which'll allow // attempt to see if we have an invoice locally which'll allow
// us to settle this HTLC. // us to settle this HTLC.
case sphinx.ExitNode: case sphinx.ExitNode:
rHash := htlcPkt.RedemptionHashes[0] rHash := htlcPkt.PaymentHash
invoice, err := p.server.invoices.LookupInvoice(rHash) invoice, err := p.server.invoices.LookupInvoice(rHash)
if err != nil { if err != nil {
// If we're the exit node, but don't recognize // If we're the exit node, but don't recognize
@ -1469,9 +1466,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
peerLog.Errorf("mal formed onion packet") peerLog.Errorf("mal formed onion packet")
state.htlcsToCancel[index] = lnwire.SphinxParseError state.htlcsToCancel[index] = lnwire.SphinxParseError
} }
case *lnwire.HTLCSettleRequest:
pre := htlcPkt.RedemptionProofs[0] case *lnwire.UpdateFufillHTLC:
idx := uint32(htlcPkt.HTLCKey) pre := htlcPkt.PaymentPreimage
idx := htlcPkt.ID
if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil { if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil {
// TODO(roasbeef): broadcast on-chain // TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for outgoing HTLC rejected: %v", err) peerLog.Errorf("settle for outgoing HTLC rejected: %v", err)
@ -1481,50 +1479,68 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// TODO(roasbeef): add preimage to DB in order to swipe // TODO(roasbeef): add preimage to DB in order to swipe
// repeated r-values // repeated r-values
case *lnwire.CancelHTLC: case *lnwire.UpdateFailHTLC:
idx := uint32(htlcPkt.HTLCKey) idx := htlcPkt.ID
if err := state.channel.ReceiveCancelHTLC(idx); err != nil { if err := state.channel.ReceiveFailHTLC(idx); err != nil {
peerLog.Errorf("unable to recv HTLC cancel: %v", err) peerLog.Errorf("unable to recv HTLC cancel: %v", err)
p.Disconnect() p.Disconnect()
return return
} }
state.cancelReasons[idx] = htlcPkt.Reason state.cancelReasons[idx] = lnwire.FailCode(htlcPkt.Reason[0])
case *lnwire.CommitSignature: case *lnwire.CommitSig:
// We just received a new update to our local commitment chain, // We just received a new update to our local commitment chain,
// validate this new commitment, closing the link if invalid. // validate this new commitment, closing the link if invalid.
// TODO(roasbeef): use uint64 for indexes? // TODO(roasbeef): redundant re-serialization
logIndex := uint32(htlcPkt.LogIndex)
sig := htlcPkt.CommitSig.Serialize() sig := htlcPkt.CommitSig.Serialize()
if err := state.channel.ReceiveNewCommitment(sig, logIndex); err != nil { if err := state.channel.ReceiveNewCommitment(sig); err != nil {
peerLog.Errorf("unable to accept new commitment: %v", err) peerLog.Errorf("unable to accept new commitment: %v", err)
p.Disconnect() p.Disconnect()
return return
} }
if state.numUnAcked > 0 { // As we've just just accepted a new state, we'll now
state.numUnAcked -= 1 // immediately send the remote peer a revocation for our prior
// TODO(roasbeef): only start if numUnacked == 0? // state.
state.logCommitTimer = time.Tick(300 * time.Millisecond)
} else {
if _, err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
p.Disconnect()
return
}
}
// Finally, since we just accepted a new state, send the remote
// peer a revocation for our prior state.
nextRevocation, err := state.channel.RevokeCurrentCommitment() nextRevocation, err := state.channel.RevokeCurrentCommitment()
if err != nil { if err != nil {
peerLog.Errorf("unable to revoke current commitment: %v", err) peerLog.Errorf("unable to revoke commitment: %v", err)
return return
} }
p.queueMsg(nextRevocation, nil) p.queueMsg(nextRevocation, nil)
case *lnwire.CommitRevocation:
// If we just initiated a state transition, and we were waiting
// for a reply from the remote peer, then we don't need to
// response with a subsequent CommitSig message. So we toggle
// the `pendingUpdate` bool, and set a timer to wake us up in
// the future to check if we have any updates we need to
// commit.
if state.pendingUpdate {
state.pendingUpdate = false
if !state.logCommitTimer.Stop() {
select {
case <-state.logCommitTimer.C:
default:
}
}
state.logCommitTimer.Reset(300 * time.Millisecond)
state.logCommitTick = state.logCommitTimer.C
return
}
// Otherwise, the remote party initiated the state transition,
// so we'll reply with a signature to provide them with their
// version of the latest commitment state.
if err := p.updateCommitTx(state, true); err != nil {
peerLog.Errorf("unable to update commitment: %v", err)
p.Disconnect()
return
}
case *lnwire.RevokeAndAck:
// We've received a revocation from the remote chain, if valid, // We've received a revocation from the remote chain, if valid,
// this moves the remote chain forward, and expands our // this moves the remote chain forward, and expands our
// revocation window. // revocation window.
@ -1541,7 +1557,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// existing) that the payment has been fully fulfilled. // existing) that the payment has been fully fulfilled.
var bandwidthUpdate btcutil.Amount var bandwidthUpdate btcutil.Amount
settledPayments := make(map[lnwallet.PaymentHash]struct{}) settledPayments := make(map[lnwallet.PaymentHash]struct{})
cancelledHtlcs := make(map[uint32]struct{}) cancelledHtlcs := make(map[uint64]struct{})
for _, htlc := range htlcsToForward { for _, htlc := range htlcsToForward {
parentIndex := htlc.ParentIndex parentIndex := htlc.ParentIndex
if p, ok := state.clearedHTCLs[parentIndex]; ok { if p, ok := state.clearedHTCLs[parentIndex]; ok {
@ -1554,7 +1570,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// Otherwise, the HTLC failed, so we propagate // Otherwise, the HTLC failed, so we propagate
// the error back to the potential caller. // the error back to the potential caller.
case lnwallet.Cancel: case lnwallet.Fail:
errMsg := state.cancelReasons[parentIndex] errMsg := state.cancelReasons[parentIndex]
p.err <- errors.New(errMsg.String()) p.err <- errors.New(errMsg.String())
} }
@ -1581,10 +1597,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
continue continue
} }
settleMsg := &lnwire.HTLCSettleRequest{ settleMsg := &lnwire.UpdateFufillHTLC{
ChannelPoint: state.chanPoint, ChannelPoint: *state.chanPoint,
HTLCKey: lnwire.HTLCKey(logIndex), ID: logIndex,
RedemptionProofs: [][32]byte{preimage}, PaymentPreimage: preimage,
} }
p.queueMsg(settleMsg, nil) p.queueMsg(settleMsg, nil)
@ -1604,17 +1620,17 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
continue continue
} }
logIndex, err := state.channel.CancelHTLC(htlc.RHash) logIndex, err := state.channel.FailHTLC(htlc.RHash)
if err != nil { if err != nil {
peerLog.Errorf("unable to cancel htlc: %v", err) peerLog.Errorf("unable to cancel htlc: %v", err)
p.Disconnect() p.Disconnect()
continue continue
} }
cancelMsg := &lnwire.CancelHTLC{ cancelMsg := &lnwire.UpdateFailHTLC{
ChannelPoint: state.chanPoint, ChannelPoint: *state.chanPoint,
HTLCKey: lnwire.HTLCKey(logIndex), ID: logIndex,
Reason: reason, Reason: []byte{byte(reason)},
} }
p.queueMsg(cancelMsg, nil) p.queueMsg(cancelMsg, nil)
delete(state.htlcsToCancel, htlc.Index) delete(state.htlcsToCancel, htlc.Index)
@ -1670,13 +1686,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// With all the settle updates added to the local and remote // With all the settle updates added to the local and remote
// HTLC logs, initiate a state transition by updating the // HTLC logs, initiate a state transition by updating the
// remote commitment chain. // remote commitment chain.
if sent, err := p.updateCommitTx(state); err != nil { if err := p.updateCommitTx(state, false); err != nil {
peerLog.Errorf("unable to update commitment: %v", err) peerLog.Errorf("unable to update commitment: %v", err)
p.Disconnect() p.Disconnect()
return return
} else if sent {
// TODO(roasbeef): wait to delete from htlcsToSettle?
state.numUnAcked += 1
} }
// Notify the invoiceRegistry of the invoices we just settled // Notify the invoiceRegistry of the invoices we just settled
@ -1694,39 +1707,57 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// updateCommitTx signs, then sends an update to the remote peer adding a new // updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates // commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point. // we've received+processed up to this point.
func (p *peer) updateCommitTx(state *commitmentState) (bool, error) { func (p *peer) updateCommitTx(state *commitmentState, reply bool) error {
sigTheirs, logIndexTheirs, err := state.channel.SignNextCommitment() sigTheirs, err := state.channel.SignNextCommitment()
if err == lnwallet.ErrNoWindow { if err == lnwallet.ErrNoWindow {
peerLog.Tracef("revocation window exhausted, unable to send %v", peerLog.Tracef("revocation window exhausted, unable to send %v",
len(state.pendingBatch)) len(state.pendingBatch))
return false, nil return nil
} else if err != nil { } else if err != nil {
return false, err return err
} }
parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256()) parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256())
if err != nil { if err != nil {
return false, fmt.Errorf("unable to parse sig: %v", err) return fmt.Errorf("unable to parse sig: %v", err)
} }
commitSig := &lnwire.CommitSignature{ commitSig := &lnwire.CommitSig{
ChannelPoint: state.chanPoint, ChannelPoint: *state.chanPoint,
CommitSig: parsedSig, CommitSig: parsedSig,
LogIndex: uint64(logIndexTheirs),
} }
p.queueMsg(commitSig, nil) p.queueMsg(commitSig, nil)
// Move all pending updates to the map of cleared HTLCs, clearing out // As we've just cleared out a batch, move all pending updates to the
// the set of pending updates. // map of cleared HTLCs, clearing out the set of pending updates.
for _, update := range state.pendingBatch { for _, update := range state.pendingBatch {
// TODO(roasbeef): add parsed next-hop info to pending batch
// for multi-hop forwarding
state.clearedHTCLs[update.index] = update state.clearedHTCLs[update.index] = update
} }
state.logCommitTimer = nil
// We've just initiated a state transition, attempt to stop the
// logCommitTimer. If the timer already ticked, then we'll consume the
// value, dropping
if state.logCommitTimer != nil && !state.logCommitTimer.Stop() {
select {
case <-state.logCommitTimer.C:
default:
}
}
state.logCommitTick = nil
// Finally, clear our the current batch, and flip the pendingUpdate
// bool to indicate were waiting for a commitment signature.
// TODO(roasbeef): re-slice instead to avoid GC?
state.pendingBatch = nil state.pendingBatch = nil
return true, nil // If this isn't a reply to a state transitioned initiated by the
// remote node, then we toggle the `pendingUpdate` bool to indicate
// that we're waiting for a CommitSig in response.
if !reply {
state.pendingUpdate = true
}
return nil
} }
// logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP) // logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP)
@ -1736,13 +1767,14 @@ func (p *peer) updateCommitTx(state *commitmentState) (bool, error) {
func logEntryToHtlcPkt(chanPoint wire.OutPoint, func logEntryToHtlcPkt(chanPoint wire.OutPoint,
pd *lnwallet.PaymentDescriptor, pd *lnwallet.PaymentDescriptor,
onionPkt *sphinx.ProcessedPacket, onionPkt *sphinx.ProcessedPacket,
reason lnwire.CancelReason) (*htlcPacket, error) { reason lnwire.FailCode) (*htlcPacket, error) {
pkt := &htlcPacket{} pkt := &htlcPacket{}
// TODO(roasbeef): alter after switch to log entry interface // TODO(roasbeef): alter after switch to log entry interface
var msg lnwire.Message var msg lnwire.Message
switch pd.EntryType { switch pd.EntryType {
case lnwallet.Add: case lnwallet.Add:
// TODO(roasbeef): timeout, onion blob, etc // TODO(roasbeef): timeout, onion blob, etc
var b bytes.Buffer var b bytes.Buffer
@ -1750,21 +1782,24 @@ func logEntryToHtlcPkt(chanPoint wire.OutPoint,
return nil, err return nil, err
} }
msg = &lnwire.HTLCAddRequest{ htlc := &lnwire.UpdateAddHTLC{
Amount: btcutil.Amount(pd.Amount), Amount: btcutil.Amount(pd.Amount),
RedemptionHashes: [][32]byte{pd.RHash}, PaymentHash: pd.RHash,
OnionBlob: b.Bytes(),
} }
copy(htlc.OnionBlob[:], b.Bytes())
msg = htlc
case lnwallet.Settle: case lnwallet.Settle:
msg = &lnwire.HTLCSettleRequest{ msg = &lnwire.UpdateFufillHTLC{
RedemptionProofs: [][32]byte{pd.RPreimage}, PaymentPreimage: pd.RPreimage,
} }
case lnwallet.Cancel:
case lnwallet.Fail:
// For cancellation messages, we'll also need to set the rHash // For cancellation messages, we'll also need to set the rHash
// within the htlcPacket so the switch knows on which outbound // within the htlcPacket so the switch knows on which outbound
// link to forward the cancellation message // link to forward the cancellation message
msg = &lnwire.CancelHTLC{ msg = &lnwire.UpdateFailHTLC{
Reason: reason, Reason: []byte{byte(reason)},
} }
pkt.payHash = pd.RHash pkt.payHash = pd.RHash
} }