From e29d8e7e060fd4ed1d4d75f8b97e8a454fb7d7b7 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Wed, 21 Sep 2016 19:41:26 -0700 Subject: [PATCH] lnd: add support for multi-hop (Sphinx) onion routed payments MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit adds full support for multi-hop onion routed payments within the daemon. The switch has been greatly extended in order to gain the functionality required to manage Sphinx payment circuits amongst active links. A payment circuit is initiated when a link sends an HTLC add to the downstream htlcSwitch it received from the upstream peer. The switch then examines the parsed sphinx packet to set up the clear/settle ends of the circuit. Created circuits can be re-used amongst HTLC payments which share the same RHash. All bandwidth updates within a link’s internal state are now managed with atomic increments/decrements in order to avoid race conditions amongst the two goroutines the switch currently uses. Each channel’s htlcManager has also been extended to parse out the next-hop contained within Sphinx packets, and construct a proper htlcPkt such that the htlcSwitch can initiate then manage the payment circuit. --- htlcswitch.go | 255 +++++++++++++++++++++++++++++++++++++++------ invoiceregistry.go | 2 +- peer.go | 156 ++++++++++++++++++--------- 3 files changed, 330 insertions(+), 83 deletions(-) diff --git a/htlcswitch.go b/htlcswitch.go index d042438f..77acc316 100644 --- a/htlcswitch.go +++ b/htlcswitch.go @@ -7,6 +7,11 @@ import ( "sync/atomic" "time" + "golang.org/x/crypto/ripemd160" + + "github.com/btcsuite/fastsha256" + "github.com/davecgh/go-spew/spew" + "github.com/lightningnetwork/lightning-onion" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnrpc" "github.com/lightningnetwork/lnd/lnwire" @@ -28,7 +33,7 @@ const ( type link struct { capacity btcutil.Amount - availableBandwidth btcutil.Amount + availableBandwidth int64 // atomic linkChan chan *htlcPacket @@ -41,15 +46,54 @@ type link struct { // settles an active HTLC. The dest field denotes the name of the interface to // forward this htlcPacket on. type htlcPacket struct { - src wire.ShaHash + sync.RWMutex + dest wire.ShaHash + index uint32 + srcLink wire.OutPoint + onion *sphinx.ProcessedPacket + msg lnwire.Message amt btcutil.Amount err chan error } +// circuitKey uniquely identifies an active Sphinx (onion routing) circuit +// between two open channels. Currently, the rHash of the HTLC which created +// the circuit is used to uniquely identify each circuit. +type circuitKey [32]byte + +// paymentCircuit represents an active Sphinx (onion routing) circuit between +// two active links within the htlcSwitch. A payment circuit is created once a +// link forwards an HTLC add request which initites the creation of the ciruit. +// The onion routing informtion contained within this message is used to +// identify the settle/clear ends of the circuit. A circuit may be re-used (not +// torndown) in the case that multiple HTLC's with the send RHash are sent. +type paymentCircuit struct { + // TODO(roasbeef): add reference count so know when to delete? + // * atomic int re + // * due to same r-value being re-used? + + // NOTE: This integer must be used *atomically*. + refCount uint32 + + // clear is the link the htlcSwitch will forward the HTLC add message + // that initiated the circuit to. Once the message is forwarded, the + // payment circuit is considered "active" from the POV of the switch as + // both the incoming/outgoing channels have the cleared HTLC within + // their latest state. + clear *link + + // settle is the link the htlcSwitch will forward the HTLC settle it + // receives from the outgoing peer to. Once the switch forwards the + // settle message to this link, the payment circuit is considered + // complete unless the reference count on the circuit is greater than + // 1. + settle *link +} + // HtlcSwitch is a central messaging bus for all incoming/outgoing HTLC's. // Connected peers with active channels are treated as named interfaces which // refer to active channels as links. A link is the switche's message @@ -59,6 +103,7 @@ type htlcPacket struct { // HTLC's, forwarding HTLC's initiated from within the daemon, and additionally // splitting up incoming/outgoing HTLC's to a particular interface amongst many // links (payment fragmentation). +// TODO(roasbeef): active sphinx circuits need to be synced to disk type htlcSwitch struct { started int32 // atomic shutdown int32 // atomic @@ -66,17 +111,38 @@ type htlcSwitch struct { // chanIndex maps a channel's outpoint to a link which contains // additional information about the channel, and additionally houses a // pointer to the peer mangaing the channel. - chanIndex map[wire.OutPoint]*link + chanIndexMtx sync.RWMutex + chanIndex map[wire.OutPoint]*link // interfaces maps a node's ID to the set of links (active channels) we // currently have open with that peer. - interfaces map[wire.ShaHash][]*link + // TODO(roasbeef): combine w/ onionIndex? + interfaceMtx sync.RWMutex + interfaces map[wire.ShaHash][]*link - // TODO(roasbeef): msgs for dynamic link quality + // onionIndex is a secondary index used to properly forward a message + // to the next hop within a Sphinx circuit. + onionMtx sync.RWMutex + onionIndex map[[ripemd160.Size]byte][]*link + + // paymentCircuits maps a circuit key to an active payment circuit + // amongst two oepn channels. This map is used to properly clear/settle + // onion routed payments within the network. + paymentCircuits map[circuitKey]*paymentCircuit + + // linkControl is a channel used by connected links to notify the + // switch of a non-multi-hop triggered link state update. linkControl chan interface{} + // outgoingPayments is a channel that outgoing payments initiated by + // the RPC system. outgoingPayments chan *htlcPacket + // htlcPlex is the channel in which all connected links use to + // coordinate the setup/tear down of Sphinx (onion routing) payment + // circuits. Active links forward any add/settle messages over this + // channel each state transition, sending new adds/settles which are + // fully locked in. htlcPlex chan *htlcPacket // TODO(roasbeef): messaging chan to/from upper layer (routing - L3) @@ -92,6 +158,8 @@ func newHtlcSwitch() *htlcSwitch { return &htlcSwitch{ chanIndex: make(map[wire.OutPoint]*link), interfaces: make(map[wire.ShaHash][]*link), + onionIndex: make(map[[ripemd160.Size]byte][]*link), + paymentCircuits: make(map[circuitKey]*paymentCircuit), linkControl: make(chan interface{}), htlcPlex: make(chan *htlcPacket, htlcQueueSize), outgoingPayments: make(chan *htlcPacket, htlcQueueSize), @@ -141,11 +209,13 @@ func (h *htlcSwitch) SendHTLC(htlcPkt *htlcPacket) error { // fragmenting) incoming/outgoing HTLC's amongst all active interfaces and // their links. The duties of the forwarder are similar to that of a network // switch, in that it facilitates multi-hop payments by acting as a central -// messaging bus. Each active channel is modeled as networked device with -// meta-data such as the available payment bandwidth, and total link capacity. +// messaging bus. The switch communicates will active links to create, manage, +// and tearn down active onion routed payments.Each active channel is modeled +// as networked device with meta-data such as the available payment bandwidth, +// and total link capacity. func (h *htlcSwitch) htlcForwarder() { // TODO(roasbeef): track pending payments here instead of within each peer? - // Examine settles/timeouts from htl cplex. Add src to htlcPacket, key by + // Examine settles/timeouts from htlcPlex. Add src to htlcPacket, key by // (src, htlcKey). // TODO(roasbeef): cleared vs settled distinction @@ -157,9 +227,12 @@ out: select { case htlcPkt := <-h.outgoingPayments: dest := htlcPkt.dest + h.interfaceMtx.RLock() chanInterface, ok := h.interfaces[dest] + h.interfaceMtx.RUnlock() if !ok { - err := fmt.Errorf("Unable to locate link %x", dest) + err := fmt.Errorf("Unable to locate link %x", + dest[:]) hswcLog.Errorf(err.Error()) htlcPkt.err <- err continue @@ -171,48 +244,138 @@ out: // Handle this send request in a distinct goroutine in // order to avoid a possible deadlock between the htlc // switch and channel's htlc manager. - var sent bool for _, link := range chanInterface { // TODO(roasbeef): implement HTLC fragmentation // * avoid full channel depletion at higher // level (here) instead of within state // machine? - if link.availableBandwidth < amt { + if link.availableBandwidth < int64(amt) { continue } hswcLog.Tracef("Sending %v to %x", amt, dest[:]) - // TODO(roasbeef): peer downstream should set chanPoint - wireMsg.ChannelPoint = link.chanPoint go func() { link.linkChan <- htlcPkt }() - // TODO(roasbeef): update link info on - // timeout/settle - link.availableBandwidth -= amt - sent = true - } + n := atomic.AddInt64(&link.availableBandwidth, + -int64(amt)) + hswcLog.Tracef("Decrementing link %v bandwidth to %v", + link.chanPoint, n) - if sent { - continue + continue out } hswcLog.Errorf("Unable to send payment, insufficient capacity") htlcPkt.err <- fmt.Errorf("Insufficient capacity") case pkt := <-h.htlcPlex: - numUpdates += 1 // TODO(roasbeef): properly account with cleared vs settled - switch pkt.msg.(type) { + numUpdates += 1 + + hswcLog.Tracef("plex packet: %v", newLogClosure(func() string { + return spew.Sdump(pkt) + })) + + switch wireMsg := pkt.msg.(type) { + // A link has just forwarded us a new HTLC, therefore + // we initiate the payment circuit within our internal + // staate so we can properly forward the ultimate + // settle message. case *lnwire.HTLCAddRequest: + // Create the two ends of the payment circuit + // required to ensure completion of this new + // payment. + nextHop := pkt.onion.NextHop + h.onionMtx.RLock() + clearLink, ok := h.onionIndex[nextHop] + h.onionMtx.RUnlock() + if !ok { + hswcLog.Errorf("unable to find dest end of "+ + "circuit: %x", nextHop) + continue + } + + h.chanIndexMtx.RLock() + settleLink := h.chanIndex[pkt.srcLink] + h.chanIndexMtx.RUnlock() + + // TODO(roasbeef): examine per-hop info to decide on link? + // * check clear has enough available sat + circuit := &paymentCircuit{ + clear: clearLink[0], + settle: settleLink, + } + + cKey := circuitKey(wireMsg.RedemptionHashes[0]) + h.paymentCircuits[cKey] = circuit + + hswcLog.Debugf("Creating onion circuit for %x: %v<->%v", + cKey[:], clearLink[0].chanPoint, + settleLink.chanPoint) + + // With the circuit initiated, send the htlcPkt + // to the clearing link within the circuit to + // continue propagating the HTLC accross the + // network. + circuit.clear.linkChan <- &htlcPacket{ + msg: wireMsg, + err: make(chan error, 1), + } + + // Reduce the available bandwidth for the link + // as it will clear the above HTLC, increasing + // the limbo balance within the channel. + n := + atomic.AddInt64(&circuit.clear.availableBandwidth, + -int64(pkt.amt)) + hswcLog.Tracef("Decrementing link %v bandwidth to %v", + circuit.clear.chanPoint, n) + satRecv += pkt.amt + + // We've just received a settle message which means we + // can finalize the payment circuit by forwarding the + // settle msg to the link which initially created the + // circuit. case *lnwire.HTLCSettleRequest: + rHash := fastsha256.Sum256(wireMsg.RedemptionProofs[0][:]) + + var cKey circuitKey + copy(cKey[:], rHash[:]) + + // If we initiated the payment then there won't + // be an active circuit so continue propagating + // the settle over. Therefore, we exit early. + circuit, ok := h.paymentCircuits[cKey] + if !ok { + hswcLog.Debugf("No existing circuit "+ + "for %x", rHash[:]) + satSent += pkt.amt + continue + } + + hswcLog.Debugf("Closing completed onion "+ + "circuit for %x: %v<->%v", rHash[:], + circuit.clear.chanPoint, + circuit.settle.chanPoint) + + circuit.settle.linkChan <- &htlcPacket{ + msg: wireMsg, + err: make(chan error, 1), + } + + // Increase the available bandwidth for the + // link as it will settle the above HTLC, + // subtracting from the limbo balacne and + // incrementing its local balance. + n := atomic.AddInt64(&circuit.settle.availableBandwidth, + int64(pkt.amt)) + hswcLog.Tracef("Incrementing link %v bandwidth to %v", + circuit.settle.chanPoint, n) + satSent += pkt.amt } - - // TODO(roasbeef): parse dest/src, forward on outgoing - // link to complete multi-hop payments. case <-logTicker.C: if numUpdates == 0 { continue @@ -264,18 +427,32 @@ func (h *htlcSwitch) handleRegisterLink(req *registerLinkMsg) { chanPoint := req.linkInfo.ChannelPoint newLink := &link{ capacity: req.linkInfo.Capacity, - availableBandwidth: req.linkInfo.LocalBalance, + availableBandwidth: int64(req.linkInfo.LocalBalance), linkChan: req.linkChan, peer: req.peer, chanPoint: chanPoint, } + + h.chanIndexMtx.Lock() h.chanIndex[*chanPoint] = newLink + h.chanIndexMtx.Unlock() interfaceID := req.peer.lightningID - h.interfaces[interfaceID] = append(h.interfaces[interfaceID], newLink) - hswcLog.Infof("registering new link, interface=%v, chan_point=%v, capacity=%v", - hex.EncodeToString(interfaceID[:]), chanPoint, newLink.capacity) + h.interfaceMtx.Lock() + h.interfaces[interfaceID] = append(h.interfaces[interfaceID], newLink) + h.interfaceMtx.Unlock() + + var onionId [ripemd160.Size]byte + copy(onionId[:], btcutil.Hash160(req.peer.identityPub.SerializeCompressed())) + + h.onionMtx.Lock() + h.onionIndex[onionId] = h.interfaces[interfaceID] + h.onionMtx.Unlock() + + hswcLog.Infof("registering new link, interface=%x, onion_link=%x, "+ + "chan_point=%v, capacity=%v", interfaceID[:], onionId, + chanPoint, newLink.capacity) if req.done != nil { req.done <- struct{}{} @@ -290,7 +467,10 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { hex.EncodeToString(req.chanInterface[:]), req.chanPoint) chanInterface := req.chanInterface + + h.interfaceMtx.RLock() links := h.interfaces[chanInterface] + h.interfaceMtx.RUnlock() // A request with a nil channel point indicates that all the current // links for this channel should be cleared. @@ -299,11 +479,15 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { hex.EncodeToString(chanInterface[:])) for _, link := range links { + h.chanIndexMtx.Lock() delete(h.chanIndex, *link.chanPoint) + h.chanIndexMtx.Unlock() } links = nil } else { + h.chanIndexMtx.Lock() delete(h.chanIndex, *req.chanPoint) + h.chanIndexMtx.Unlock() for i := 0; i < len(links); i++ { chanLink := links[i] @@ -317,10 +501,15 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { } } + // TODO(roasbeef): clean up/modify onion links + // * just have the interfaces index be keyed on hash160? + if len(links) == 0 { hswcLog.Infof("interface %v has no active links, destroying", hex.EncodeToString(chanInterface[:])) + h.interfaceMtx.Lock() delete(h.interfaces, chanInterface) + h.interfaceMtx.Unlock() } if req.done != nil { @@ -331,7 +520,10 @@ func (h *htlcSwitch) handleUnregisterLink(req *unregisterLinkMsg) { // handleCloseLink sends a message to the peer responsible for the target // channel point, instructing it to initiate a cooperative channel closure. func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) { + h.chanIndexMtx.RLock() targetLink, ok := h.chanIndex[*req.chanPoint] + h.chanIndexMtx.RUnlock() + if !ok { req.err <- fmt.Errorf("channel point %v not found", req.chanPoint) return @@ -345,8 +537,11 @@ func (h *htlcSwitch) handleCloseLink(req *closeLinkReq) { // handleLinkUpdate processes the link info update message by adjusting the // channels available bandwidth by the delta specified within the message. func (h *htlcSwitch) handleLinkUpdate(req *linkInfoUpdateMsg) { + h.chanIndexMtx.RLock() link := h.chanIndex[*req.targetLink] - link.availableBandwidth += req.bandwidthDelta + h.chanIndexMtx.RUnlock() + + atomic.AddInt64(&link.availableBandwidth, int64(req.bandwidthDelta)) hswcLog.Tracef("adjusting bandwidth of link %v by %v", req.targetLink, req.bandwidthDelta) diff --git a/invoiceregistry.go b/invoiceregistry.go index 752adce6..0ca22027 100644 --- a/invoiceregistry.go +++ b/invoiceregistry.go @@ -110,7 +110,7 @@ func (i *invoiceRegistry) LookupInvoice(rHash wire.ShaHash) (*channeldb.Invoice, // dbueg invoice, then this method is a nooop as debug invoices are never fully // settled. func (i *invoiceRegistry) SettleInvoice(rHash wire.ShaHash) error { - ltndLog.Debugf("Setting invoice %x", rHash[:]) + ltndLog.Debugf("Settling invoice %x", rHash[:]) // First check the in-memory debug invoice index to see if this is an // existing invoice added for debugging. diff --git a/peer.go b/peer.go index 21979253..59a40927 100644 --- a/peer.go +++ b/peer.go @@ -891,6 +891,14 @@ type commitmentState struct { // within HTLC add messages. sphinx *sphinx.Router + // pendingCircuits tracks the remote log index of the incoming HTLC's, + // mapped to the processed Sphinx packet contained within the HTLC. + // This map is used as a staging area between when an HTLC is added to + // the log, and when it's locked into the commitment state of both + // chains. Once locked in, the processed packet is sent to the switch + // along with the HTLC to forward the packet to the next hop. + pendingCircuits map[uint32]*sphinx.ProcessedPacket + channel *lnwallet.LightningChannel chanPoint *wire.OutPoint } @@ -926,12 +934,13 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel, } state := &commitmentState{ - channel: channel, - chanPoint: channel.ChannelPoint(), - clearedHTCLs: make(map[uint32]*pendingPayment), - htlcsToSettle: make(map[uint32]*channeldb.Invoice), - sphinx: p.server.sphinx, - switchChan: htlcPlex, + channel: channel, + chanPoint: channel.ChannelPoint(), + clearedHTCLs: make(map[uint32]*pendingPayment), + htlcsToSettle: make(map[uint32]*channeldb.Invoice), + pendingCircuits: make(map[uint32]*sphinx.ProcessedPacket), + sphinx: p.server.sphinx, + switchChan: htlcPlex, } // TODO(roasbeef): check to see if able to settle any currently pending @@ -1022,12 +1031,14 @@ out: // HTLC's, timeout previously cleared HTLC's, and finally to settle currently // cleared HTLC's with the upstream peer. func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { + var isSettle bool switch htlc := pkt.msg.(type) { case *lnwire.HTLCAddRequest: // A new payment has been initiated via the // downstream channel, so we add the new HTLC // to our local log, then update the commitment // chains. + htlc.ChannelPoint = state.chanPoint index := state.channel.AddHTLC(htlc) p.queueMsg(htlc, nil) @@ -1037,21 +1048,37 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) { err: pkt.err, }) - // If this newly added update exceeds the max batch size, the - // initiate an update. - // TODO(roasbeef): enforce max HTLC's in flight limit - if len(state.pendingBatch) >= 10 { - if sent, err := p.updateCommitTx(state); err != nil { - peerLog.Errorf("unable to update "+ - "commitment: %v", err) - p.Disconnect() - return - } else if !sent { - return - } - - state.numUnAcked += 1 + case *lnwire.HTLCSettleRequest: + pre := htlc.RedemptionProofs[0] + logIndex, err := state.channel.SettleHTLC(pre) + if err != nil { + // TODO(roasbeef): broadcast on-chain + peerLog.Errorf("settle for incoming HTLC rejected: %v", err) + p.Disconnect() + return } + + htlc.ChannelPoint = state.chanPoint + htlc.HTLCKey = lnwire.HTLCKey(logIndex) + + p.queueMsg(htlc, nil) + isSettle = true + } + + // If this newly added update exceeds the max batch size for adds, or + // this is a settle request, then initiate an update. + // TODO(roasbeef): enforce max HTLC's in flight limit + if len(state.pendingBatch) >= 10 || isSettle { + if sent, err := p.updateCommitTx(state); err != nil { + peerLog.Errorf("unable to update "+ + "commitment: %v", err) + p.Disconnect() + return + } else if !sent { + return + } + + state.numUnAcked += 1 } } @@ -1072,7 +1099,7 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { p.Disconnect() return } - mixHeader, err := state.sphinx.ProcessOnionPacket(onionPkt) + sphinxPacket, err := state.sphinx.ProcessOnionPacket(onionPkt) if err != nil { peerLog.Errorf("unable to process onion pkt: %v", err) p.Disconnect() @@ -1084,10 +1111,10 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // "settle" list in the event that we know the pre-image index := state.channel.ReceiveHTLC(htlcPkt) - switch mixHeader.Action { + switch sphinxPacket.Action { // We're the designated payment destination. Therefore we - // attempt to see if we have an invoice locally which'll - // allow us to settle this HTLC. + // attempt to see if we have an invoice locally which'll allow + // us to settle this HTLC. case sphinx.ExitNode: rHash := htlcPkt.RedemptionHashes[0] invoice, err := p.server.invoices.LookupInvoice(rHash) @@ -1100,11 +1127,15 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // TODO(roasbeef): check values accept if >= state.htlcsToSettle[index] = invoice - return + + // There are additional hops left within this route, so we + // track the next hop according to the index of this HTLC + // within their log. When forwarding locked-in HLTC's to the + // switch, we'll attach the routing information so the switch + // can finalize the circuit. case sphinx.MoreHops: - // TODO(roasbeef): parse out the next dest so can - // attach to packet when forwarding. - // * send cancel + error if not in rounting table + // TODO(roasbeef): send cancel + error if not in rounting table + state.pendingCircuits[index] = sphinxPacket default: peerLog.Errorf("mal formed onion packet") p.Disconnect() @@ -1163,25 +1194,12 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { return } - // We perform the HTLC forwarding to the switch in a distinct - // goroutine in order not to block the post-processing of - // HTLC's that are eligble for forwarding. - // TODO(roasbeef): don't forward if we're going to settle them - go func() { - for _, htlc := range htlcsToForward { - // Send this fully activated HTLC to the htlc - // switch to continue the chained clear/settle. - state.switchChan <- p.logEntryToHtlcPkt(htlc) - } - - }() - // If any of the htlc's eligible for forwarding are pending // settling or timeing out previous outgoing payments, then we // can them from the pending set, and signal the requster (if // existing) that the payment has been fully fulfilled. var bandwidthUpdate btcutil.Amount - var settledPayments []wire.ShaHash + settledPayments := make(map[lnwallet.PaymentHash]struct{}) numSettled := 0 for _, htlc := range htlcsToForward { if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok { @@ -1222,12 +1240,37 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { delete(state.htlcsToSettle, htlc.Index) bandwidthUpdate += invoice.Terms.Value - settledPayments = append(settledPayments, - wire.ShaHash(htlc.RHash)) + settledPayments[htlc.RHash] = struct{}{} numSettled++ } + go func() { + for _, htlc := range htlcsToForward { + // We don't need to forward any HTLC's that we + // just settled above. + if _, ok := settledPayments[htlc.RHash]; ok { + continue + } + + onionPkt := state.pendingCircuits[htlc.Index] + delete(state.pendingCircuits, htlc.Index) + + // Send this fully activated HTLC to the htlc + // switch to continue the chained clear/settle. + pkt, err := logEntryToHtlcPkt(*state.chanPoint, + htlc, onionPkt) + if err != nil { + peerLog.Errorf("unable to make htlc pkt: %v", + err) + continue + } + + state.switchChan <- pkt + } + + }() + if numSettled == 0 { return } @@ -1254,8 +1297,8 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) { // Notify the invoiceRegistry of the invoices we just settled // with this latest commitment update. - for _, invoice := range settledPayments { - err := p.server.invoices.SettleInvoice(invoice) + for invoice, _ := range settledPayments { + err := p.server.invoices.SettleInvoice(wire.ShaHash(invoice)) if err != nil { peerLog.Errorf("unable to settle invoice: %v", err) } @@ -1305,7 +1348,10 @@ func (p *peer) updateCommitTx(state *commitmentState) (bool, error) { // log entry the corresponding htlcPacket with src/dest set along with the // proper wire message. This helepr method is provided in order to aide an // htlcManager in forwarding packets to the htlcSwitch. -func (p *peer) logEntryToHtlcPkt(pd *lnwallet.PaymentDescriptor) *htlcPacket { +func logEntryToHtlcPkt(chanPoint wire.OutPoint, + pd *lnwallet.PaymentDescriptor, + onionPkt *sphinx.ProcessedPacket) (*htlcPacket, error) { + pkt := &htlcPacket{} // TODO(roasbeef): alter after switch to log entry interface @@ -1313,23 +1359,29 @@ func (p *peer) logEntryToHtlcPkt(pd *lnwallet.PaymentDescriptor) *htlcPacket { switch pd.EntryType { case lnwallet.Add: // TODO(roasbeef): timeout, onion blob, etc + var b bytes.Buffer + if err := onionPkt.Packet.Encode(&b); err != nil { + return nil, err + } + msg = &lnwire.HTLCAddRequest{ Amount: lnwire.CreditsAmount(pd.Amount), RedemptionHashes: [][32]byte{pd.RHash}, + OnionBlob: b.Bytes(), } case lnwallet.Settle: - // TODO(roasbeef): thread through preimage msg = &lnwire.HTLCSettleRequest{ - HTLCKey: lnwire.HTLCKey(pd.ParentIndex), + RedemptionProofs: [][32]byte{pd.RPreimage}, } } - // TODO(roasbeef): set dest via onion blob or state pkt.amt = pd.Amount pkt.msg = msg - pkt.src = p.lightningID - return pkt + pkt.srcLink = chanPoint + pkt.onion = onionPkt + + return pkt, nil } // TODO(roasbeef): make all start/stop mutexes a CAS