diff --git a/htlcswitch.go b/htlcswitch.go index 5b9a6b7c..73d4d9f6 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -12,6 +12,7 @@ import ( "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnrpc" + "github.com/lightningnetwork/lnd/lnwallet" "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/chaincfg/chainhash" @@ -30,6 +31,77 @@ var ( zeroBytes [32]byte ) +// boundedLinkChan is a simple wrapper around a link's communication channel +// that bounds the total flow into and through the channel. Channels attached +// the link have a value which defines the max number of pending HTLC's present +// within the commitment transaction. Using this struct we establish a +// synchronization primitive that ensure we don't send additional htlcPackets +// to a link if the max limit has een reached. Once HTLC's are cleared from the +// commitment transaction, slots are freed up and more can proceed. +type boundedLinkChan struct { + // slots is a buffered channel whose buffer is the total number of + // outstanding HTLC's we can add to a link's commitment transaction. + // This channel is essentially used as a semaphore. + slots chan struct{} + + // linkChan is a channel that is connected to the channel state machine + // for a link. The switch will send adds, settles, and cancels over + // this channel. + linkChan chan *htlcPacket +} + +// newBoundedChan makes a new boundedLinkChan that has numSlots free slots that +// are depleted on each send until a slot is re-stored. linkChan is the +// underlying channel that will be sent upon. +func newBoundedLinkChan(numSlots uint32, + linkChan chan *htlcPacket) *boundedLinkChan { + + b := &boundedLinkChan{ + slots: make(chan struct{}, numSlots), + linkChan: linkChan, + } + + b.restoreSlots(numSlots) + return b +} + +// sendAndConsume sends a packet to the linkChan and consumes a single token in +// the process. +// +// TODO(roasbeef): add error fall through case? +func (b *boundedLinkChan) sendAndConsume(pkt *htlcPacket) { + <-b.slots + b.linkChan <- pkt +} + +// sendAndRestore sends a packet to the linkChan and consumes a single token in +// the process. This method is called when the switch sends either a cancel or +// settle HTLC message to the link. +func (b *boundedLinkChan) sendAndRestore(pkt *htlcPacket) { + b.linkChan <- pkt + b.slots <- struct{}{} +} + +// consumeSlot consumes a single slot from the bounded channel. This method is +// called once the switch receives a new htlc add message from a link right +// before forwarding it to the next hop. +func (b *boundedLinkChan) consumeSlot() { + <-b.slots +} + +// restoreSlot restores a single slots to the bounded channel. This method is +// called once the switch receives an HTLC cancel or settle from a link. +func (b *boundedLinkChan) restoreSlot() { + b.slots <- struct{}{} +} + +// restoreSlots adds numSlots additional slots to the bounded channel. +func (b *boundedLinkChan) restoreSlots(numSlots uint32) { + for i := uint32(0); i < numSlots; i++ { + b.slots <- struct{}{} + } +} + // link represents an active channel capable of forwarding HTLCs. Each // active channel registered with the htlc switch creates a new link which will // be used for forwarding outgoing HTLCs. The link also has additional @@ -40,7 +112,7 @@ type link struct { availableBandwidth int64 // atomic - linkChan chan *htlcPacket + *boundedLinkChan peer *peer @@ -66,7 +138,8 @@ type htlcPacket struct { preImage chan [32]byte - err chan error + err chan error + done chan struct{} } // circuitKey uniquely identifies an active Sphinx (onion routing) circuit @@ -214,6 +287,7 @@ func (h *htlcSwitch) Stop() error { // alternative error is returned. func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) ([32]byte, error) { htlcPkt.err = make(chan error, 1) + htlcPkt.done = make(chan struct{}) htlcPkt.preImage = make(chan [32]byte, 1) h.outgoingPayments <- htlcPkt @@ -277,7 +351,9 @@ out: hswcLog.Tracef("Sending %v to %x", amt, dest[:]) go func() { - link.linkChan <- htlcPkt + link.sendAndConsume(htlcPkt) + <-htlcPkt.done + link.restoreSlot() }() n := atomic.AddInt64(&link.availableBandwidth, @@ -345,6 +421,11 @@ out: settleLink := h.chanIndex[pkt.srcLink] h.chanIndexMtx.RUnlock() + // As the link now has a new HTLC that's been + // propagated to us, we'll consume a slot from + // it's bounded channel. + settleLink.consumeSlot() + // If the link we're attempting to forward the // HTLC over has insufficient capacity, then // we'll cancel the HTLC as the payment cannot @@ -361,11 +442,13 @@ out: payHash: payHash, msg: &lnwire.UpdateFailHTLC{ Reason: []byte{uint8(lnwire.InsufficientCapacity)}, - }, - err: make(chan error, 1), + }, err: make(chan error, 1), } - settleLink.linkChan <- pkt + // Send the cancel message along the + // link, restoring a slot in the + // bounded channel in the process. + settleLink.sendAndRestore(pkt) continue } @@ -385,11 +468,12 @@ out: // to the clearing link within the circuit to // continue propagating the HTLC across the // network. - circuit.clear.linkChan <- &htlcPacket{ + circuit.clear.sendAndConsume(&htlcPacket{ msg: wireMsg, preImage: make(chan [32]byte, 1), err: make(chan error, 1), - } + done: make(chan struct{}), + }) // Reduce the available bandwidth for the link // as it will clear the above HTLC, increasing @@ -421,15 +505,17 @@ out: continue } + circuit.clear.restoreSlot() + hswcLog.Debugf("Closing completed onion "+ "circuit for %x: %v<->%v", rHash[:], circuit.clear.chanPoint, circuit.settle.chanPoint) - circuit.settle.linkChan <- &htlcPacket{ + circuit.settle.sendAndRestore(&htlcPacket{ msg: wireMsg, err: make(chan error, 1), - } + }) // Increase the available bandwidth for the // link as it will settle the above HTLC, @@ -459,6 +545,8 @@ out: continue } + circuit.clear.restoreSlot() + // Since an outgoing HTLC we sent on the clear // link has been cancelled, we update the // bandwidth of the clear link, restoring the @@ -473,11 +561,11 @@ out: // the error propagation by sending the // cancellation message over the link that sent // us the incoming HTLC. - circuit.settle.linkChan <- &htlcPacket{ + circuit.settle.sendAndRestore(&htlcPacket{ msg: wireMsg, payHash: pkt.payHash, err: make(chan error, 1), - } + }) delete(h.paymentCircuits, pkt.payHash) } @@ -550,11 +638,17 @@ func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) { newLink := &link{ capacity: req.linkInfo.Capacity, availableBandwidth: int64(req.linkInfo.LocalBalance), - linkChan: req.linkChan, peer: req.peer, chanPoint: chanPoint, } + // To ensure we never accidentally cause an HTLC overflow, we'll limit, + // we'll use this buffered channel as as semaphore in order to limit + // the number of outstanding HTLC's we extend to the target link. + //const numSlots = (lnwallet.MaxHTLCNumber / 2) - 1 + const numSlots = lnwallet.MaxHTLCNumber - 5 + newLink.boundedLinkChan = newBoundedLinkChan(numSlots, req.linkChan) + // First update the channel index with this new channel point. The // channel index will be used to quickly lookup channels in order to: // close them, update their link capacity, or possibly during multi-hop @@ -818,6 +912,9 @@ type linkInfoUpdateMsg struct { // within the link by the passed satoshi delta. This function may be used when // re-anchoring to boost the capacity of a channel, or once a peer settles an // HTLC invoice. -func (h *htlcSwitch) UpdateLink(chanPoint *wire.OutPoint, bandwidthDelta btcutil.Amount) { - h.linkControl <- &linkInfoUpdateMsg{chanPoint, bandwidthDelta} +func (h *htlcSwitch) UpdateLink(chanPoint *wire.OutPoint, delta btcutil.Amount) { + h.linkControl <- &linkInfoUpdateMsg{ + targetLink: chanPoint, + bandwidthDelta: delta, + } } diff --git a/peer.go b/peer.go index 8c46c986..c9fcb9ad 100644 --- a/peer.go +++ b/peer.go @@ -741,6 +741,8 @@ out: // Now that the channel is open, notify the Htlc // Switch of a new active link. + // TODO(roasbeef): register needs to account for + // in-flight htlc's on restart chanSnapShot := newChanReq.channel.StateSnapshot() downstreamLink := make(chan *htlcPacket, 10) plexChan := p.server.htlcSwitch.RegisterLink(p, @@ -1022,6 +1024,7 @@ type pendingPayment struct { preImage chan [32]byte err chan error + done chan struct{} } // commitmentState is the volatile+persistent state of an active channel's @@ -1263,16 +1266,22 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { if err != nil { // TODO: possibly perform fallback/retry logic // depending on type of error - // TODO: send a cancel message back to the htlcSwitch. peerLog.Errorf("Adding HTLC rejected: %v", err) pkt.err <- err + close(pkt.done) - // Increase the available bandwidth of the link, - // previously it was decremented and because - // HTLC adding failed we should do the reverse - // operation. - htlcSwitch := p.server.htlcSwitch - htlcSwitch.UpdateLink(&htlc.ChannelPoint, pkt.amt) + // The HTLC was unable to be added to the state + // machine, as a result, we'll signal the switch to + // cancel the pending payment. + // TODO(roasbeef): need to update link as well if local + // HTLC? + state.switchChan <- &htlcPacket{ + amt: htlc.Amount, + msg: &lnwire.UpdateFailHTLC{ + Reason: []byte{byte(0)}, + }, + srcLink: *state.chanPoint, + } return } @@ -1283,6 +1292,7 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { index: index, preImage: pkt.preImage, err: pkt.err, + done: pkt.done, }) case *lnwire.UpdateFufillHTLC: @@ -1368,6 +1378,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { index, err := state.channel.ReceiveHTLC(htlcPkt) if err != nil { peerLog.Errorf("Receiving HTLC rejected: %v", err) + p.Disconnect() return } @@ -1545,6 +1556,8 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { p.err <- errors.New(errMsg.String()) } + close(p.done) + delete(state.clearedHTCLs, htlc.ParentIndex) }