diff --git a/htlcswitch/circuit_map.go b/htlcswitch/circuit_map.go index eabb9108..01108d50 100644 --- a/htlcswitch/circuit_map.go +++ b/htlcswitch/circuit_map.go @@ -6,6 +6,7 @@ import ( "sync" "github.com/coreos/bbolt" + "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/channeldb" "github.com/lightningnetwork/lnd/lnwire" @@ -47,7 +48,8 @@ var ( type CircuitModifier interface { // OpenCircuits preemptively records a batch keystones that will mark // currently pending circuits as open. These changes can be rolled back - // on restart if the outgoing Adds do not make it into a commitment txn. + // on restart if the outgoing Adds do not make it into a commitment + // txn. OpenCircuits(...Keystone) error // TrimOpenCircuits removes a channel's open channels with htlc indexes @@ -60,11 +62,11 @@ type CircuitModifier interface { DeleteCircuits(inKeys ...CircuitKey) error } -// CircuitFwdActions represents the forwarding decision made by the circuit map, -// and is returned from CommitCircuits. The sequence of circuits provided to -// CommitCircuits is split into three subsequences, allowing the caller to do an -// in-order scan, comparing the head of each subsequence, to determine the -// decision made by the circuit map. +// CircuitFwdActions represents the forwarding decision made by the circuit +// map, and is returned from CommitCircuits. The sequence of circuits provided +// to CommitCircuits is split into three sub-sequences, allowing the caller to +// do an in-order scan, comparing the head of each subsequence, to determine +// the decision made by the circuit map. type CircuitFwdActions struct { // Adds is the subsequence of circuits that were successfully committed // in the circuit map. @@ -85,10 +87,10 @@ type CircuitMap interface { CircuitModifier // CommitCircuits attempts to add the given circuits to the circuit - // map. The list of circuits is split into three distinct subsequences, - // corresponding to adds, drops, and fails. Adds should be forwarded to - // the switch, while fails should be failed back locally within the - // calling link. + // map. The list of circuits is split into three distinct + // sub-sequences, corresponding to adds, drops, and fails. Adds should + // be forwarded to the switch, while fails should be failed back + // locally within the calling link. CommitCircuits(circuit ...*PaymentCircuit) (*CircuitFwdActions, error) // CloseCircuit marks the circuit identified by `outKey` as closing @@ -105,8 +107,8 @@ type CircuitMap interface { // inKey. LookupCircuit(inKey CircuitKey) *PaymentCircuit - // LookupOpenCircuit queries the circuit map for a circuit identified by - // its outgoing circuit key. + // LookupOpenCircuit queries the circuit map for a circuit identified + // by its outgoing circuit key. LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit // LookupByPaymentHash queries the circuit map and returns all open @@ -124,15 +126,15 @@ type CircuitMap interface { var ( // circuitAddKey is the key used to retrieve the bucket containing - // payment circuits. A circuit records information about how to return a - // packet to the source link, potentially including an error encrypter - // for applying this hop's encryption to the payload in the reverse - // direction. + // payment circuits. A circuit records information about how to return + // a packet to the source link, potentially including an error + // encrypter for applying this hop's encryption to the payload in the + // reverse direction. circuitAddKey = []byte("circuit-adds") // circuitKeystoneKey is used to retrieve the bucket containing circuit - // keystones, which are set in place once a forwarded packet is assigned - // an index on an outgoing commitment txn. + // keystones, which are set in place once a forwarded packet is + // assigned an index on an outgoing commitment txn. circuitKeystoneKey = []byte("circuit-keystones") ) @@ -143,18 +145,19 @@ var ( // outgoing CircuitKey if the circuit is fully-opened. type circuitMap struct { // db provides the persistent storage engine for the circuit map. + // // TODO(conner): create abstraction to allow for the substitution of // other persistence engines. db *channeldb.DB mtx sync.RWMutex - // pending is an in-memory mapping of all half payment circuits, and - // is kept in sync with the on-disk contents of the circuit map. + // pending is an in-memory mapping of all half payment circuits, and is + // kept in sync with the on-disk contents of the circuit map. pending map[CircuitKey]*PaymentCircuit - // opened is an in-memory mapping of all full payment circuits, which is - // also synchronized with the persistent state of the circuit map. + // opened is an in-memory mapping of all full payment circuits, which + // is also synchronized with the persistent state of the circuit map. opened map[CircuitKey]*PaymentCircuit // closed is an in-memory set of circuits for which the switch has @@ -211,11 +214,12 @@ func (cm *circuitMap) initBuckets() error { }) } -// restoreMemState loads the contents of the half circuit and full circuit buckets -// from disk and reconstructs the in-memory representation of the circuit map. -// Afterwards, the state of the hash index is reconstructed using the recovered -// set of full circuits. +// restoreMemState loads the contents of the half circuit and full circuit +// buckets from disk and reconstructs the in-memory representation of the +// circuit map. Afterwards, the state of the hash index is reconstructed using +// the recovered set of full circuits. func (cm *circuitMap) restoreMemState() error { + log.Infof("Restoring in-memory circuit state from disk") var ( opened = make(map[CircuitKey]*PaymentCircuit) @@ -286,6 +290,9 @@ func (cm *circuitMap) restoreMemState() error { cm.opened = opened cm.closed = make(map[CircuitKey]struct{}) + log.Infof("Payment circuits loaded: num_pending=%v, num_open=%v", + len(pending), len(opened)) + // Finally, reconstruct the hash index by running through our set of // open circuits. cm.hashIndex = make(map[[32]byte]map[CircuitKey]struct{}) @@ -339,18 +346,22 @@ func (cm *circuitMap) trimAllOpenCircuits() error { } // TrimOpenCircuits removes a channel's keystones above the short chan id's -// highest committed htlc index. This has the effect of returning those circuits -// to a half-open state. Since opening of circuits is done in advance of -// actually committing the Add htlcs into a commitment txn, this allows circuits -// to be opened preemetively, since we can roll them back after any failures. +// highest committed htlc index. This has the effect of returning those +// circuits to a half-open state. Since opening of circuits is done in advance +// of actually committing the Add htlcs into a commitment txn, this allows +// circuits to be opened preemptively, since we can roll them back after any +// failures. func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID, start uint64) error { + log.Infof("Trimming open circuits for chan_id=%v, start_htlc_id=%v", + chanID, start) + var trimmedOutKeys []CircuitKey // Scan forward from the last unacked htlc id, stopping as soon as we - // don't find any more. Outgoing htlc id's must be assigned in order, so - // there should never be disjoint segments of keystones to trim. + // don't find any more. Outgoing htlc id's must be assigned in order, + // so there should never be disjoint segments of keystones to trim. cm.mtx.Lock() for i := start; ; i++ { outKey := CircuitKey{ @@ -440,6 +451,10 @@ func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit { func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( *CircuitFwdActions, error) { + log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string { + return spew.Sdump(circuits) + })) + actions := &CircuitFwdActions{} // If an empty list was passed, return early to avoid grabbing the lock. @@ -563,7 +578,8 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) ( // Keystone is a tuple binding an incoming and outgoing CircuitKey. Keystones // are preemptively written by an outgoing link before signing a new commitment -// state, and cements which HTLCs we are awaiting a response from a remote peer. +// state, and cements which HTLCs we are awaiting a response from a remote +// peer. type Keystone struct { InKey CircuitKey OutKey CircuitKey @@ -583,6 +599,10 @@ func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error { return nil } + log.Tracef("Opening finalized circuits: %v", newLogClosure(func() string { + return spew.Sdump(keystones) + })) + // Check that all keystones correspond to committed-but-unopened // circuits. cm.mtx.RLock() @@ -658,8 +678,7 @@ func (cm *circuitMap) addCircuitToHashIndex(c *PaymentCircuit) { // FailCircuit marks the circuit identified by `inKey` as closing in-memory, // which prevents duplicate settles/fails from completing an open circuit twice. -func (cm *circuitMap) FailCircuit( - inKey CircuitKey) (*PaymentCircuit, error) { +func (cm *circuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit, error) { cm.mtx.Lock() defer cm.mtx.Unlock() @@ -679,11 +698,10 @@ func (cm *circuitMap) FailCircuit( return circuit, nil } -// CloseCircuit marks the circuit identified by `outKey` as closing -// in-memory, which prevents duplicate settles/fails from completing an open +// CloseCircuit marks the circuit identified by `outKey` as closing in-memory, +// which prevents duplicate settles/fails from completing an open // circuit twice. -func (cm *circuitMap) CloseCircuit( - outKey CircuitKey) (*PaymentCircuit, error) { +func (cm *circuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error) { cm.mtx.Lock() defer cm.mtx.Unlock() @@ -709,6 +727,10 @@ func (cm *circuitMap) CloseCircuit( // circuit key. func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error { + log.Tracef("Deleting resolved circuits: %v", newLogClosure(func() string { + return spew.Sdump(inKeys) + })) + var ( closingCircuits = make(map[CircuitKey]struct{}) removedCircuits = make(map[CircuitKey]*PaymentCircuit) diff --git a/htlcswitch/link.go b/htlcswitch/link.go index 29449542..66dec4a8 100644 --- a/htlcswitch/link.go +++ b/htlcswitch/link.go @@ -9,6 +9,7 @@ import ( "crypto/sha256" + "github.com/davecgh/go-spew/spew" "github.com/go-errors/errors" "github.com/lightningnetwork/lnd/chainntnfs" "github.com/lightningnetwork/lnd/channeldb" @@ -140,8 +141,9 @@ type ChannelLinkConfig struct { // DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion // blobs, which are then used to inform how to forward an HTLC. - // NOTE: This function assumes the same set of readers and preimages are - // always presented for the same identifier. + // + // NOTE: This function assumes the same set of readers and preimages + // are always presented for the same identifier. DecodeHopIterators func([]byte, []DecodeHopIteratorRequest) ( []DecodeHopIteratorResponse, error) @@ -210,14 +212,15 @@ type ChannelLinkConfig struct { // BatchTicker is the ticker that determines the interval that we'll // use to check the batch to see if there're any updates we should - // flush out. By batching updates into a single commit, we attempt - // to increase throughput by maximizing the number of updates - // coalesced into a single commit. + // flush out. By batching updates into a single commit, we attempt to + // increase throughput by maximizing the number of updates coalesced + // into a single commit. BatchTicker Ticker // FwdPkgGCTicker is the ticker determining the frequency at which - // garbage collection of forwarding packages occurs. We use a time-based - // approach, as opposed to block epochs, as to not hinder syncing. + // garbage collection of forwarding packages occurs. We use a + // time-based approach, as opposed to block epochs, as to not hinder + // syncing. FwdPkgGCTicker Ticker // BatchSize is the max size of a batch of updates done to the link @@ -232,8 +235,8 @@ type ChannelLinkConfig struct { } // channelLink is the service which drives a channel's commitment update -// state-machine. In the event that an htlc needs to be propagated to another -// link, the forward handler from config is used which sends htlc to the +// state-machine. In the event that an HTLC needs to be propagated to another +// link, the forward handler from config is used which sends HTLC to the // switch. Additionally, the link encapsulate logic of commitment protocol // message ordering and updates. type channelLink struct { @@ -246,8 +249,8 @@ type channelLink struct { // current number of settles that have been sent, but not yet committed // to the commitment. // - // TODO(andrew.shvv) remove after we add additional - // BatchNumber() method in state machine. + // TODO(andrew.shvv) remove after we add additional BatchNumber() + // method in state machine. batchCounter uint32 // bestHeight is the best known height of the main chain. The link will @@ -255,10 +258,22 @@ type channelLink struct { bestHeight uint32 // keystoneBatch represents a volatile list of keystones that must be - // written before attempting to sign the next commitment txn. + // written before attempting to sign the next commitment txn. These + // represent all the HTLC's forwarded to the link from the switch. Once + // we lock them into our outgoing commitment, then the circuit has a + // keystone, and is fully opened. keystoneBatch []Keystone + // openedCircuits is the set of all payment circuits that will be open + // once we make our next commitment. After making the commitment we'll + // ACK all these from our mailbox to ensure that they don't get + // re-delivered if we reconnect. openedCircuits []CircuitKey + + // closedCircuits is the set of all payment circuits that will be + // closed once we make our next commitment. After taking the commitment + // we'll ACK all these to ensure that they don't get re-delivered if we + // reconnect. closedCircuits []CircuitKey // channel is a lightning network channel to which we apply htlc @@ -526,9 +541,9 @@ func (l *channelLink) syncChanStates() error { closedCircuits []CircuitKey ) - // We've just received a ChnSync message from the remote party, - // so we'll process the message in order to determine if we - // need to re-transmit any messages to the remote party. + // We've just received a ChanSync message from the remote + // party, so we'll process the message in order to determine + // if we need to re-transmit any messages to the remote party. msgsToReSend, openedCircuits, closedCircuits, err = l.channel.ProcessChanSyncMsg(remoteChanSyncMsg) if err != nil { @@ -539,7 +554,8 @@ func (l *channelLink) syncChanStates() error { } // Repopulate any identifiers for circuits that may have been - // opened or unclosed. + // opened or unclosed. This may happen if we needed to + // retransmit a commitment signature message. l.openedCircuits = openedCircuits l.closedCircuits = closedCircuits @@ -574,10 +590,10 @@ func (l *channelLink) syncChanStates() error { } // resolveFwdPkgs loads any forwarding packages for this link from disk, and -// reprocesses them in order. The primary goal is to make sure that any HTLCs we -// previously received are reinstated in memory, and forwarded to the switch if -// necessary. After a restart, this will also delete any previously completed -// packages. +// reprocesses them in order. The primary goal is to make sure that any HTLCs +// we previously received are reinstated in memory, and forwarded to the switch +// if necessary. After a restart, this will also delete any previously +// completed packages. func (l *channelLink) resolveFwdPkgs() error { fwdPkgs, err := l.channel.LoadFwdPkgs() if err != nil { @@ -623,10 +639,10 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) { } // Otherwise this is either a new package or one has gone through - // processing, but contains htlcs that need to be restored in memory. We - // replay this forwarding package to make sure our local mem state is - // resurrected, we mimic any original responses back to the remote - // party, and reforward the relevant HTLCs to the switch. + // processing, but contains htlcs that need to be restored in memory. + // We replay this forwarding package to make sure our local mem state + // is resurrected, we mimic any original responses back to the remote + // party, and re-forward the relevant HTLCs to the switch. // If the package is fully acked but not completed, it must still have // settles and fails to propagate. @@ -637,10 +653,10 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) { l.processRemoteSettleFails(fwdPkg, settleFails) } - // Finally, replay *ALL ADDS* in this forwarding package. The downstream - // logic is able to filter out any duplicates, but we must shove the - // entire, original set of adds down the pipeline so that the batch of - // adds presented to the sphinx router does not ever change. + // Finally, replay *ALL ADDS* in this forwarding package. The + // downstream logic is able to filter out any duplicates, but we must + // shove the entire, original set of adds down the pipeline so that the + // batch of adds presented to the sphinx router does not ever change. var needUpdate bool if !fwdPkg.AckFilter.IsFull() { adds := lnwallet.PayDescsFromRemoteLogUpdates( @@ -717,10 +733,10 @@ func (l *channelLink) htlcManager() { // commitment txn. We use the next local htlc index as the cut off // point, since all indexes below that are committed. // - // NOTE: This is automatically done by the switch when it starts up, but - // is necessary to prevent inconsistencies in the case that the link - // flaps. This is a result of a link's life-cycle being shorter than - // that of the switch. + // NOTE: This is automatically done by the switch when it starts up, + // but is necessary to prevent inconsistencies in the case that the + // link flaps. This is a result of a link's life-cycle being shorter + // than that of the switch. localHtlcIndex := l.channel.LocalHtlcIndex() err := l.cfg.Circuits.TrimOpenCircuits(l.ShortChanID(), localHtlcIndex) if err != nil { @@ -737,7 +753,6 @@ func (l *channelLink) htlcManager() { // re-synchronize state with the remote peer. settledHtlcs is a map of // HTLC's that we re-settled as part of the channel state sync. if l.cfg.SyncStates { - // TODO(roasbeef): need to ensure haven't already settled? if err := l.syncChanStates(); err != nil { l.errorf("unable to synchronize channel states: %v", err) l.fail(err.Error()) @@ -745,9 +760,9 @@ func (l *channelLink) htlcManager() { } } - // With the channel states synced, we now reset the mailbox to ensure we - // start processing all unacked packets in order. This is done here to - // ensure that all acknowledgments that occur during channel + // With the channel states synced, we now reset the mailbox to ensure + // we start processing all unacked packets in order. This is done here + // to ensure that all acknowledgments that occur during channel // resynchronization have taken affect, causing us only to pull unacked // packets after starting to read from the downstream mailbox. l.mailBox.ResetPackets() @@ -827,7 +842,8 @@ out: // TODO(roasbeef): remove all together go func() { chanPoint := l.channel.ChannelPoint() - if err := l.cfg.Peer.WipeChannel(chanPoint); err != nil { + err := l.cfg.Peer.WipeChannel(chanPoint) + if err != nil { log.Errorf("unable to wipe channel %v", err) } }() @@ -895,6 +911,7 @@ out: l.overflowQueue.AddPkt(pkt) continue } + l.handleDownStreamPkt(pkt, false) // A message from the connected peer was just received. This @@ -956,7 +973,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // The channels spare bandwidth is fully allocated, so // we'll put this HTLC into the overflow queue. case lnwallet.ErrMaxHTLCNumber: - log.Infof("Downstream htlc add update with "+ + l.infof("Downstream htlc add update with "+ "payment hash(%x) have been added to "+ "reprocessing queue, batch: %v", htlc.PaymentHash[:], @@ -969,7 +986,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // machine, as a result, we'll signal the switch to // cancel the pending payment. default: - log.Warnf("Unable to handle downstream add HTLC: %v", err) + l.warnf("Unable to handle downstream add HTLC: %v", err) var ( localFailure = false @@ -984,7 +1001,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { var b bytes.Buffer err := lnwire.EncodeFailure(&b, failure, 0) if err != nil { - log.Errorf("unable to encode failure: %v", err) + l.errorf("unable to encode failure: %v", err) return } reason = lnwire.OpaqueReason(b.Bytes()) @@ -993,7 +1010,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { var err error reason, err = pkt.obfuscator.EncryptFirstHop(failure) if err != nil { - log.Errorf("unable to obfuscate error: %v", err) + l.errorf("unable to obfuscate error: %v", err) return } } @@ -1027,7 +1044,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { } } - log.Tracef("Received downstream htlc: payment_hash=%x, "+ + l.tracef("Received downstream htlc: payment_hash=%x, "+ "local_log_index=%v, batch_size=%v", htlc.PaymentHash[:], index, l.batchCounter+1) @@ -1047,7 +1064,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) { // An HTLC we forward to the switch has just settled somewhere // upstream. Therefore we settle the HTLC within the our local // state machine. - closedCircuitRef := pkt.inKey() if err := l.channel.SettleHTLC( htlc.PaymentPreimage, @@ -1138,7 +1154,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { return } - log.Tracef("Receive upstream htlc with payment hash(%x), "+ + l.tracef("Receive upstream htlc with payment hash(%x), "+ "assigning index: %v", msg.PaymentHash[:], index) case *lnwire.UpdateFulfillHTLC: @@ -1159,7 +1175,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { go func() { err := l.cfg.PreimageCache.AddPreimage(pre[:]) if err != nil { - log.Errorf("unable to add preimage=%x to "+ + l.errorf("unable to add preimage=%x to "+ "cache", pre[:]) } }() @@ -1191,7 +1207,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { // form. var b bytes.Buffer if err := lnwire.EncodeFailure(&b, failure, 0); err != nil { - log.Errorf("unable to encode malformed error: %v", err) + l.errorf("unable to encode malformed error: %v", err) return } @@ -1330,12 +1346,12 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) { } -// ackDownStreamPackets is responsible for removing htlcs from a link's -// mailbox for packets delivered from server, and cleaning up any circuits -// closed by signing a previous commitment txn. This method ensures that the -// circuits are removed from the circuit map before removing them from the -// link's mailbox, otherwise it could be possible for some circuit to be missed -// if this link flaps. +// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox +// for packets delivered from server, and cleaning up any circuits closed by +// signing a previous commitment txn. This method ensures that the circuits are +// removed from the circuit map before removing them from the link's mailbox, +// otherwise it could be possible for some circuit to be missed if this link +// flaps. // // The `forgive` flag allows this method to tolerate restarts, and ignores // errors that could be caused by a previous circuit deletion. Under normal @@ -1346,15 +1362,15 @@ func (l *channelLink) ackDownStreamPackets(forgive bool) error { // previous commitment signature. This will prevent the Adds from being // replayed if this link disconnects. for _, inKey := range l.openedCircuits { - // In order to test the sphinx replay logic of the remote party, - // unsafe replay does not acknowledge the packets from the - // mailbox. We can then force a replay of any Add packets held - // in memory by disconnecting and reconnecting the link. + // In order to test the sphinx replay logic of the remote + // party, unsafe replay does not acknowledge the packets from + // the mailbox. We can then force a replay of any Add packets + // held in memory by disconnecting and reconnecting the link. if l.cfg.UnsafeReplay { continue } - l.debugf("Removing Add packet %s from mailbox", inKey) + l.debugf("removing Add packet %s from mailbox", inKey) l.mailBox.AckPacket(inKey) } @@ -1370,11 +1386,11 @@ func (l *channelLink) ackDownStreamPackets(forgive bool) error { case ErrUnknownCircuit: if forgive { // After a restart, we may have already removed this - // circuit. Since it shouldn't be possible for a circuit - // to be closed by different htlcs, we assume this error - // signals that the whole batch was successfully - // removed. - l.warnf("Forgiving unknown circuit error after " + + // circuit. Since it shouldn't be possible for a + // circuit to be closed by different htlcs, we assume + // this error signals that the whole batch was + // successfully removed. + l.warnf("forgiving unknown circuit error after " + "attempting deletion, circuit was probably " + "removed before shutting down.") break @@ -1392,9 +1408,9 @@ func (l *channelLink) ackDownStreamPackets(forgive bool) error { // Settle/Fails in the mailbox to ensure they do not get redelivered // after startup. If forgive is enabled and we've reached this point, // the circuits must have been removed at some point, so it is now safe - // to unqueue the corresponding Settle/Fails. + // to un-queue the corresponding Settle/Fails. for _, inKey := range l.closedCircuits { - l.debugf("Removing Fail/Settle packet %s from mailbox", inKey) + l.debugf("removing Fail/Settle packet %s from mailbox", inKey) l.mailBox.AckPacket(inKey) } @@ -1526,24 +1542,29 @@ type getBandwidthCmd struct { func (l *channelLink) Bandwidth() lnwire.MilliSatoshi { channelBandwidth := l.channel.AvailableBalance() overflowBandwidth := l.overflowQueue.TotalHtlcAmount() - linkBandwidth := channelBandwidth - overflowBandwidth - reserve := lnwire.NewMSatFromSatoshis(l.channel.LocalChanReserve()) - // If the channel reserve is greater than the total available - // balance of the link, just return 0. + // To compute the total bandwidth, we'll take the current available + // bandwidth, then subtract the overflow bandwidth as we'll eventually + // also need to evaluate those HTLC's once space on the commitment + // transaction is free. + linkBandwidth := channelBandwidth - overflowBandwidth + + // If the channel reserve is greater than the total available balance + // of the link, just return 0. + reserve := lnwire.NewMSatFromSatoshis(l.channel.LocalChanReserve()) if linkBandwidth < reserve { return 0 } - // Else the amount that is available to flow through the link at - // this point is the available balance minus the reserve amount - // we are required to keep as collateral. + // Else the amount that is available to flow through the link at this + // point is the available balance minus the reserve amount we are + // required to keep as collateral. return linkBandwidth - reserve } -// AttachMailBox updates the current mailbox used by this link, and hooks up the -// mailbox's message and packet outboxes to the link's upstream and downstream -// chans, respectively. +// AttachMailBox updates the current mailbox used by this link, and hooks up +// the mailbox's message and packet outboxes to the link's upstream and +// downstream chans, respectively. func (l *channelLink) AttachMailBox(mailbox MailBox) { l.Lock() l.mailBox = mailbox @@ -1611,6 +1632,7 @@ func (l *channelLink) String() string { func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error { l.tracef("received switch packet inkey=%v, outkey=%v", pkt.inKey(), pkt.outKey()) + l.mailBox.AddPacket(pkt) return nil } @@ -1669,12 +1691,16 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg, var switchPackets []*htlcPacket for i, pd := range settleFails { - // Skip any settles or fails that have already been acknowledged - // by the incoming link that originated the forwarded Add. + // Skip any settles or fails that have already been + // acknowledged by the incoming link that originated the + // forwarded Add. if fwdPkg.SettleFailFilter.Contains(uint16(i)) { continue } + // TODO(roasbeef): rework log entries to a shared + // interface. + switch pd.EntryType { // A settle for an HTLC we previously forwarded HTLC has been @@ -1759,8 +1785,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, // Atomically decode the incoming htlcs, simultaneously checking for // replay attempts. A particular index in the returned, spare list of - // channel iterators should only be used if the failure code at the same - // index is lnwire.FailCodeNone. + // channel iterators should only be used if the failure code at the + // same index is lnwire.FailCodeNone. decodeResps, sphinxErr := l.cfg.DecodeHopIterators( fwdPkg.ID(), decodeReqs, ) @@ -1781,11 +1807,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg, if fwdPkg.State == channeldb.FwdStateProcessed && fwdPkg.AckFilter.Contains(idx) { - // If this index is already found in the ack filter, the - // response to this forwarding decision has already been - // committed by one of our commitment txns. ADDs in this - // state are waiting for the rest of the fwding package - // to get acked before being garbage collected. + // If this index is already found in the ack filter, + // the response to this forwarding decision has already + // been committed by one of our commitment txns. ADDs + // in this state are waiting for the rest of the fwding + // package to get acked before being garbage collected. continue } @@ -2313,14 +2339,14 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) { l.handleBatchFwdErrs(errChan) } -// handleBatchFwdErrs waits on the given errChan until it is closed, logging the -// errors returned from any unsuccessful forwarding attempts. +// handleBatchFwdErrs waits on the given errChan until it is closed, logging +// the errors returned from any unsuccessful forwarding attempts. func (l *channelLink) handleBatchFwdErrs(errChan chan error) { for { err, ok := <-errChan if !ok { - // Err chan has been drained or switch is shutting down. - // Either way, return. + // Err chan has been drained or switch is shutting + // down. Either way, return. return } @@ -2335,9 +2361,8 @@ func (l *channelLink) handleBatchFwdErrs(errChan chan error) { // sendHTLCError functions cancels HTLC and send cancel message back to the // peer from which HTLC was received. -func (l *channelLink) sendHTLCError(htlcIndex uint64, - failure lnwire.FailureMessage, e ErrorEncrypter, - sourceRef *channeldb.AddRef) { +func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMessage, + e ErrorEncrypter, sourceRef *channeldb.AddRef) { reason, err := e.EncryptFirstHop(failure) if err != nil { diff --git a/htlcswitch/log.go b/htlcswitch/log.go index 69e71e4b..bd4593a3 100644 --- a/htlcswitch/log.go +++ b/htlcswitch/log.go @@ -24,3 +24,19 @@ func DisableLog() { func UseLogger(logger btclog.Logger) { log = logger } + +// logClosure is used to provide a closure over expensive logging operations so +// don't have to be performed when the logging level doesn't warrant it. +type logClosure func() string + +// String invokes the underlying function and returns the result. +func (c logClosure) String() string { + return c() +} + +// newLogClosure returns a new closure over a function that returns a string +// which itself provides a Stringer interface so that it can be used with the +// logging system. +func newLogClosure(c func() string) logClosure { + return logClosure(c) +} diff --git a/htlcswitch/mailbox.go b/htlcswitch/mailbox.go index 64924fe3..01474bee 100644 --- a/htlcswitch/mailbox.go +++ b/htlcswitch/mailbox.go @@ -247,6 +247,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { select { case msgDone := <-m.msgReset: m.wireMessages.Init() + close(msgDone) case <-m.quit: m.wireCond.L.Unlock() @@ -261,8 +262,13 @@ func (m *memoryMailBox) mailCourier(cType courierType) { m.pktCond.Wait() select { + // Resetting the packet queue means just moving + // our pointer to the front. This ensures that + // any un-ACK'd messages are re-delivered upon + // reconnect. case pktDone := <-m.pktReset: m.pktHead = m.htlcPkts.Front() + close(pktDone) case <-m.quit: m.pktCond.L.Unlock() @@ -272,17 +278,22 @@ func (m *memoryMailBox) mailCourier(cType courierType) { } } - // Grab the datum off the front of the queue, shifting the - // slice's reference down one in order to remove the datum from - // the queue. var ( nextPkt *htlcPacket nextMsg lnwire.Message ) switch cType { + // Grab the datum off the front of the queue, shifting the + // slice's reference down one in order to remove the datum from + // the queue. case wireCourier: entry := m.wireMessages.Front() nextMsg = m.wireMessages.Remove(entry).(lnwire.Message) + + // For packets, we actually never remove an item until it has + // been ACK'd by the link. This ensures that if a read packet + // doesn't make it into a commitment, then it'll be + // re-delivered once the link comes back online. case pktCourier: nextPkt = m.pktHead.Value.(*htlcPacket) m.pktHead = m.pktHead.Next() @@ -297,6 +308,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { m.pktCond.L.Unlock() } + // With the next message obtained, we'll now select to attempt // to deliver the message. If we receive a kill signal, then // we'll bail out. switch cType { @@ -307,6 +319,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { m.wireCond.L.Lock() m.wireMessages.Init() m.wireCond.L.Unlock() + close(msgDone) case <-m.quit: return @@ -319,6 +332,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) { m.pktCond.L.Lock() m.pktHead = m.htlcPkts.Front() m.pktCond.L.Unlock() + close(pktDone) case <-m.quit: return diff --git a/htlcswitch/switch.go b/htlcswitch/switch.go index 9089a95e..6fac494a 100644 --- a/htlcswitch/switch.go +++ b/htlcswitch/switch.go @@ -346,8 +346,8 @@ func (s *Switch) SendHTLC(nextNode [33]byte, htlc *lnwire.UpdateAddHTLC, "while waiting for payment result") } - // Remove circuit since we are about to complete an - // add/fail of this HTLC. + // Remove circuit since we are about to complete an add/fail of this + // HTLC. if teardownErr := s.teardownCircuit(response); teardownErr != nil { log.Warnf("unable to teardown circuit %s: %v", response.inKey(), teardownErr) @@ -409,7 +409,7 @@ type updatePoliciesCmd struct { // updateLinkPolicies attempts to update the forwarding policies for the set of // passed links identified by their channel points. If a nil set of channel // points is passed, then the forwarding policies for all active links will be -// updated.k +// updated. func (s *Switch) updateLinkPolicies(c *updatePoliciesCmd) error { log.Debugf("Updating link policies: %v", spew.Sdump(c)) @@ -440,9 +440,9 @@ func (s *Switch) updateLinkPolicies(c *updatePoliciesCmd) error { return nil } -// forward is used in order to find next channel link and apply htlc -// update. Also this function is used by channel links itself in order to -// forward the update after it has been included in the channel. +// forward is used in order to find next channel link and apply htlc update. +// Also this function is used by channel links itself in order to forward the +// update after it has been included in the channel. func (s *Switch) forward(packet *htlcPacket) error { switch htlc := packet.htlc.(type) { case *lnwire.UpdateAddHTLC: @@ -475,10 +475,11 @@ func (s *Switch) forward(packet *htlcPacket) error { return s.route(packet) } -// ForwardPackets adds a list of packets to the switch for processing. Fails and -// settles are added on a first past, simultaneously constructing circuits for -// any adds. After persisting the circuits, another pass of the adds is given to -// forward them through the router. +// ForwardPackets adds a list of packets to the switch for processing. Fails +// and settles are added on a first past, simultaneously constructing circuits +// for any adds. After persisting the circuits, another pass of the adds is +// given to forward them through the router. +// // NOTE: This method guarantees that the returned err chan will eventually be // closed. The receiver should read on the channel until receiving such a // signal. @@ -911,7 +912,6 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error { return destination.HandleSwitchPacket(packet) case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC: - // If the source of this packet has not been set, use the // circuit map to lookup the origin. circuit, err := s.closeCircuit(packet) @@ -1032,10 +1032,9 @@ func (s *Switch) failAddPacket(packet *htlcPacket, // set the incoming chan and htlc ID of the given packet if the source was // found, and will properly [re]encrypt any failure messages. func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { - - // If the packet has its source, that means it was failed locally by the - // outgoing link. We fail it here to make sure only one response makes - // it through the switch. + // If the packet has its source, that means it was failed locally by + // the outgoing link. We fail it here to make sure only one response + // makes it through the switch. if pkt.hasSource { circuit, err := s.circuits.FailCircuit(pkt.inKey()) switch err { @@ -1044,15 +1043,16 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { case nil: return circuit, nil - // Circuit was previously closed, but has not been deleted. We'll just - // drop this response until the circuit has been fully removed. + // Circuit was previously closed, but has not been deleted. + // We'll just drop this response until the circuit has been + // fully removed. case ErrCircuitClosing: return nil, err - // Failed to close circuit because it does not exist. This is likely - // because the circuit was already successfully closed. Since - // this packet failed locally, there is no forwarding package - // entry to acknowledge. + // Failed to close circuit because it does not exist. This is + // likely because the circuit was already successfully closed. + // Since this packet failed locally, there is no forwarding + // package entry to acknowledge. case ErrUnknownCircuit: return nil, err @@ -1062,8 +1062,8 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { } } - // Otherwise, this is packet was received from the remote party. - // Use circuit map to find the incoming link to receive the settle/fail. + // Otherwise, this is packet was received from the remote party. Use + // circuit map to find the incoming link to receive the settle/fail. circuit, err := s.circuits.CloseCircuit(pkt.outKey()) switch err { @@ -1074,6 +1074,16 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { pkt.circuit = circuit pkt.sourceRef = &circuit.AddRef + pktType := "SETTLE" + if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok { + pktType = "FAIL" + } + + log.Debugf("Closed completed %s circuit for %x: "+ + "(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash, + pkt.incomingChanID, pkt.incomingHTLCID, + pkt.outgoingChanID, pkt.outgoingHTLCID) + return circuit, nil // Circuit was previously closed, but has not been deleted. We'll just @@ -1105,6 +1115,10 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) { } } +// ackSettleFail is used by the switch to ACK any settle/fail entries in the +// forwarding package of the outgoing link for a payment circuit. We do this if +// we're the originator of the payment, so the link stops attempting to +// re-broadcast. func (s *Switch) ackSettleFail(settleFailRef channeldb.SettleFailRef) error { return s.cfg.DB.Update(func(tx *bolt.Tx) error { return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRef) @@ -1149,6 +1163,7 @@ func (s *Switch) teardownCircuit(pkt *htlcPacket) error { default: log.Debugf("Tearing down incomplete circuit with %s for inkey=%v", pktType, pkt.inKey()) + err := s.circuits.DeleteCircuits(pkt.inKey()) if err != nil { log.Warnf("Failed to tear down pending %s circuit for %x: "+