mirror of https://github.com/BTCPrivate/lnd.git
htlcswitch: add additional comments and logging
This commit is contained in:
parent
c285bb5814
commit
1af8fa9367
|
@ -6,6 +6,7 @@ import (
|
||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/coreos/bbolt"
|
"github.com/coreos/bbolt"
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/go-errors/errors"
|
"github.com/go-errors/errors"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
"github.com/lightningnetwork/lnd/lnwire"
|
"github.com/lightningnetwork/lnd/lnwire"
|
||||||
|
@ -47,7 +48,8 @@ var (
|
||||||
type CircuitModifier interface {
|
type CircuitModifier interface {
|
||||||
// OpenCircuits preemptively records a batch keystones that will mark
|
// OpenCircuits preemptively records a batch keystones that will mark
|
||||||
// currently pending circuits as open. These changes can be rolled back
|
// currently pending circuits as open. These changes can be rolled back
|
||||||
// on restart if the outgoing Adds do not make it into a commitment txn.
|
// on restart if the outgoing Adds do not make it into a commitment
|
||||||
|
// txn.
|
||||||
OpenCircuits(...Keystone) error
|
OpenCircuits(...Keystone) error
|
||||||
|
|
||||||
// TrimOpenCircuits removes a channel's open channels with htlc indexes
|
// TrimOpenCircuits removes a channel's open channels with htlc indexes
|
||||||
|
@ -60,11 +62,11 @@ type CircuitModifier interface {
|
||||||
DeleteCircuits(inKeys ...CircuitKey) error
|
DeleteCircuits(inKeys ...CircuitKey) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// CircuitFwdActions represents the forwarding decision made by the circuit map,
|
// CircuitFwdActions represents the forwarding decision made by the circuit
|
||||||
// and is returned from CommitCircuits. The sequence of circuits provided to
|
// map, and is returned from CommitCircuits. The sequence of circuits provided
|
||||||
// CommitCircuits is split into three subsequences, allowing the caller to do an
|
// to CommitCircuits is split into three sub-sequences, allowing the caller to
|
||||||
// in-order scan, comparing the head of each subsequence, to determine the
|
// do an in-order scan, comparing the head of each subsequence, to determine
|
||||||
// decision made by the circuit map.
|
// the decision made by the circuit map.
|
||||||
type CircuitFwdActions struct {
|
type CircuitFwdActions struct {
|
||||||
// Adds is the subsequence of circuits that were successfully committed
|
// Adds is the subsequence of circuits that were successfully committed
|
||||||
// in the circuit map.
|
// in the circuit map.
|
||||||
|
@ -85,10 +87,10 @@ type CircuitMap interface {
|
||||||
CircuitModifier
|
CircuitModifier
|
||||||
|
|
||||||
// CommitCircuits attempts to add the given circuits to the circuit
|
// CommitCircuits attempts to add the given circuits to the circuit
|
||||||
// map. The list of circuits is split into three distinct subsequences,
|
// map. The list of circuits is split into three distinct
|
||||||
// corresponding to adds, drops, and fails. Adds should be forwarded to
|
// sub-sequences, corresponding to adds, drops, and fails. Adds should
|
||||||
// the switch, while fails should be failed back locally within the
|
// be forwarded to the switch, while fails should be failed back
|
||||||
// calling link.
|
// locally within the calling link.
|
||||||
CommitCircuits(circuit ...*PaymentCircuit) (*CircuitFwdActions, error)
|
CommitCircuits(circuit ...*PaymentCircuit) (*CircuitFwdActions, error)
|
||||||
|
|
||||||
// CloseCircuit marks the circuit identified by `outKey` as closing
|
// CloseCircuit marks the circuit identified by `outKey` as closing
|
||||||
|
@ -105,8 +107,8 @@ type CircuitMap interface {
|
||||||
// inKey.
|
// inKey.
|
||||||
LookupCircuit(inKey CircuitKey) *PaymentCircuit
|
LookupCircuit(inKey CircuitKey) *PaymentCircuit
|
||||||
|
|
||||||
// LookupOpenCircuit queries the circuit map for a circuit identified by
|
// LookupOpenCircuit queries the circuit map for a circuit identified
|
||||||
// its outgoing circuit key.
|
// by its outgoing circuit key.
|
||||||
LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit
|
LookupOpenCircuit(outKey CircuitKey) *PaymentCircuit
|
||||||
|
|
||||||
// LookupByPaymentHash queries the circuit map and returns all open
|
// LookupByPaymentHash queries the circuit map and returns all open
|
||||||
|
@ -124,15 +126,15 @@ type CircuitMap interface {
|
||||||
|
|
||||||
var (
|
var (
|
||||||
// circuitAddKey is the key used to retrieve the bucket containing
|
// circuitAddKey is the key used to retrieve the bucket containing
|
||||||
// payment circuits. A circuit records information about how to return a
|
// payment circuits. A circuit records information about how to return
|
||||||
// packet to the source link, potentially including an error encrypter
|
// a packet to the source link, potentially including an error
|
||||||
// for applying this hop's encryption to the payload in the reverse
|
// encrypter for applying this hop's encryption to the payload in the
|
||||||
// direction.
|
// reverse direction.
|
||||||
circuitAddKey = []byte("circuit-adds")
|
circuitAddKey = []byte("circuit-adds")
|
||||||
|
|
||||||
// circuitKeystoneKey is used to retrieve the bucket containing circuit
|
// circuitKeystoneKey is used to retrieve the bucket containing circuit
|
||||||
// keystones, which are set in place once a forwarded packet is assigned
|
// keystones, which are set in place once a forwarded packet is
|
||||||
// an index on an outgoing commitment txn.
|
// assigned an index on an outgoing commitment txn.
|
||||||
circuitKeystoneKey = []byte("circuit-keystones")
|
circuitKeystoneKey = []byte("circuit-keystones")
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -143,18 +145,19 @@ var (
|
||||||
// outgoing CircuitKey if the circuit is fully-opened.
|
// outgoing CircuitKey if the circuit is fully-opened.
|
||||||
type circuitMap struct {
|
type circuitMap struct {
|
||||||
// db provides the persistent storage engine for the circuit map.
|
// db provides the persistent storage engine for the circuit map.
|
||||||
|
//
|
||||||
// TODO(conner): create abstraction to allow for the substitution of
|
// TODO(conner): create abstraction to allow for the substitution of
|
||||||
// other persistence engines.
|
// other persistence engines.
|
||||||
db *channeldb.DB
|
db *channeldb.DB
|
||||||
|
|
||||||
mtx sync.RWMutex
|
mtx sync.RWMutex
|
||||||
|
|
||||||
// pending is an in-memory mapping of all half payment circuits, and
|
// pending is an in-memory mapping of all half payment circuits, and is
|
||||||
// is kept in sync with the on-disk contents of the circuit map.
|
// kept in sync with the on-disk contents of the circuit map.
|
||||||
pending map[CircuitKey]*PaymentCircuit
|
pending map[CircuitKey]*PaymentCircuit
|
||||||
|
|
||||||
// opened is an in-memory mapping of all full payment circuits, which is
|
// opened is an in-memory mapping of all full payment circuits, which
|
||||||
// also synchronized with the persistent state of the circuit map.
|
// is also synchronized with the persistent state of the circuit map.
|
||||||
opened map[CircuitKey]*PaymentCircuit
|
opened map[CircuitKey]*PaymentCircuit
|
||||||
|
|
||||||
// closed is an in-memory set of circuits for which the switch has
|
// closed is an in-memory set of circuits for which the switch has
|
||||||
|
@ -211,11 +214,12 @@ func (cm *circuitMap) initBuckets() error {
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
// restoreMemState loads the contents of the half circuit and full circuit buckets
|
// restoreMemState loads the contents of the half circuit and full circuit
|
||||||
// from disk and reconstructs the in-memory representation of the circuit map.
|
// buckets from disk and reconstructs the in-memory representation of the
|
||||||
// Afterwards, the state of the hash index is reconstructed using the recovered
|
// circuit map. Afterwards, the state of the hash index is reconstructed using
|
||||||
// set of full circuits.
|
// the recovered set of full circuits.
|
||||||
func (cm *circuitMap) restoreMemState() error {
|
func (cm *circuitMap) restoreMemState() error {
|
||||||
|
log.Infof("Restoring in-memory circuit state from disk")
|
||||||
|
|
||||||
var (
|
var (
|
||||||
opened = make(map[CircuitKey]*PaymentCircuit)
|
opened = make(map[CircuitKey]*PaymentCircuit)
|
||||||
|
@ -286,6 +290,9 @@ func (cm *circuitMap) restoreMemState() error {
|
||||||
cm.opened = opened
|
cm.opened = opened
|
||||||
cm.closed = make(map[CircuitKey]struct{})
|
cm.closed = make(map[CircuitKey]struct{})
|
||||||
|
|
||||||
|
log.Infof("Payment circuits loaded: num_pending=%v, num_open=%v",
|
||||||
|
len(pending), len(opened))
|
||||||
|
|
||||||
// Finally, reconstruct the hash index by running through our set of
|
// Finally, reconstruct the hash index by running through our set of
|
||||||
// open circuits.
|
// open circuits.
|
||||||
cm.hashIndex = make(map[[32]byte]map[CircuitKey]struct{})
|
cm.hashIndex = make(map[[32]byte]map[CircuitKey]struct{})
|
||||||
|
@ -339,18 +346,22 @@ func (cm *circuitMap) trimAllOpenCircuits() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// TrimOpenCircuits removes a channel's keystones above the short chan id's
|
// TrimOpenCircuits removes a channel's keystones above the short chan id's
|
||||||
// highest committed htlc index. This has the effect of returning those circuits
|
// highest committed htlc index. This has the effect of returning those
|
||||||
// to a half-open state. Since opening of circuits is done in advance of
|
// circuits to a half-open state. Since opening of circuits is done in advance
|
||||||
// actually committing the Add htlcs into a commitment txn, this allows circuits
|
// of actually committing the Add htlcs into a commitment txn, this allows
|
||||||
// to be opened preemetively, since we can roll them back after any failures.
|
// circuits to be opened preemptively, since we can roll them back after any
|
||||||
|
// failures.
|
||||||
func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
|
func (cm *circuitMap) TrimOpenCircuits(chanID lnwire.ShortChannelID,
|
||||||
start uint64) error {
|
start uint64) error {
|
||||||
|
|
||||||
|
log.Infof("Trimming open circuits for chan_id=%v, start_htlc_id=%v",
|
||||||
|
chanID, start)
|
||||||
|
|
||||||
var trimmedOutKeys []CircuitKey
|
var trimmedOutKeys []CircuitKey
|
||||||
|
|
||||||
// Scan forward from the last unacked htlc id, stopping as soon as we
|
// Scan forward from the last unacked htlc id, stopping as soon as we
|
||||||
// don't find any more. Outgoing htlc id's must be assigned in order, so
|
// don't find any more. Outgoing htlc id's must be assigned in order,
|
||||||
// there should never be disjoint segments of keystones to trim.
|
// so there should never be disjoint segments of keystones to trim.
|
||||||
cm.mtx.Lock()
|
cm.mtx.Lock()
|
||||||
for i := start; ; i++ {
|
for i := start; ; i++ {
|
||||||
outKey := CircuitKey{
|
outKey := CircuitKey{
|
||||||
|
@ -440,6 +451,10 @@ func (cm *circuitMap) LookupByPaymentHash(hash [32]byte) []*PaymentCircuit {
|
||||||
func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
|
func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
|
||||||
*CircuitFwdActions, error) {
|
*CircuitFwdActions, error) {
|
||||||
|
|
||||||
|
log.Tracef("Committing fresh circuits: %v", newLogClosure(func() string {
|
||||||
|
return spew.Sdump(circuits)
|
||||||
|
}))
|
||||||
|
|
||||||
actions := &CircuitFwdActions{}
|
actions := &CircuitFwdActions{}
|
||||||
|
|
||||||
// If an empty list was passed, return early to avoid grabbing the lock.
|
// If an empty list was passed, return early to avoid grabbing the lock.
|
||||||
|
@ -563,7 +578,8 @@ func (cm *circuitMap) CommitCircuits(circuits ...*PaymentCircuit) (
|
||||||
|
|
||||||
// Keystone is a tuple binding an incoming and outgoing CircuitKey. Keystones
|
// Keystone is a tuple binding an incoming and outgoing CircuitKey. Keystones
|
||||||
// are preemptively written by an outgoing link before signing a new commitment
|
// are preemptively written by an outgoing link before signing a new commitment
|
||||||
// state, and cements which HTLCs we are awaiting a response from a remote peer.
|
// state, and cements which HTLCs we are awaiting a response from a remote
|
||||||
|
// peer.
|
||||||
type Keystone struct {
|
type Keystone struct {
|
||||||
InKey CircuitKey
|
InKey CircuitKey
|
||||||
OutKey CircuitKey
|
OutKey CircuitKey
|
||||||
|
@ -583,6 +599,10 @@ func (cm *circuitMap) OpenCircuits(keystones ...Keystone) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
log.Tracef("Opening finalized circuits: %v", newLogClosure(func() string {
|
||||||
|
return spew.Sdump(keystones)
|
||||||
|
}))
|
||||||
|
|
||||||
// Check that all keystones correspond to committed-but-unopened
|
// Check that all keystones correspond to committed-but-unopened
|
||||||
// circuits.
|
// circuits.
|
||||||
cm.mtx.RLock()
|
cm.mtx.RLock()
|
||||||
|
@ -658,8 +678,7 @@ func (cm *circuitMap) addCircuitToHashIndex(c *PaymentCircuit) {
|
||||||
|
|
||||||
// FailCircuit marks the circuit identified by `inKey` as closing in-memory,
|
// FailCircuit marks the circuit identified by `inKey` as closing in-memory,
|
||||||
// which prevents duplicate settles/fails from completing an open circuit twice.
|
// which prevents duplicate settles/fails from completing an open circuit twice.
|
||||||
func (cm *circuitMap) FailCircuit(
|
func (cm *circuitMap) FailCircuit(inKey CircuitKey) (*PaymentCircuit, error) {
|
||||||
inKey CircuitKey) (*PaymentCircuit, error) {
|
|
||||||
|
|
||||||
cm.mtx.Lock()
|
cm.mtx.Lock()
|
||||||
defer cm.mtx.Unlock()
|
defer cm.mtx.Unlock()
|
||||||
|
@ -679,11 +698,10 @@ func (cm *circuitMap) FailCircuit(
|
||||||
return circuit, nil
|
return circuit, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// CloseCircuit marks the circuit identified by `outKey` as closing
|
// CloseCircuit marks the circuit identified by `outKey` as closing in-memory,
|
||||||
// in-memory, which prevents duplicate settles/fails from completing an open
|
// which prevents duplicate settles/fails from completing an open
|
||||||
// circuit twice.
|
// circuit twice.
|
||||||
func (cm *circuitMap) CloseCircuit(
|
func (cm *circuitMap) CloseCircuit(outKey CircuitKey) (*PaymentCircuit, error) {
|
||||||
outKey CircuitKey) (*PaymentCircuit, error) {
|
|
||||||
|
|
||||||
cm.mtx.Lock()
|
cm.mtx.Lock()
|
||||||
defer cm.mtx.Unlock()
|
defer cm.mtx.Unlock()
|
||||||
|
@ -709,6 +727,10 @@ func (cm *circuitMap) CloseCircuit(
|
||||||
// circuit key.
|
// circuit key.
|
||||||
func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
|
func (cm *circuitMap) DeleteCircuits(inKeys ...CircuitKey) error {
|
||||||
|
|
||||||
|
log.Tracef("Deleting resolved circuits: %v", newLogClosure(func() string {
|
||||||
|
return spew.Sdump(inKeys)
|
||||||
|
}))
|
||||||
|
|
||||||
var (
|
var (
|
||||||
closingCircuits = make(map[CircuitKey]struct{})
|
closingCircuits = make(map[CircuitKey]struct{})
|
||||||
removedCircuits = make(map[CircuitKey]*PaymentCircuit)
|
removedCircuits = make(map[CircuitKey]*PaymentCircuit)
|
||||||
|
|
|
@ -9,6 +9,7 @@ import (
|
||||||
|
|
||||||
"crypto/sha256"
|
"crypto/sha256"
|
||||||
|
|
||||||
|
"github.com/davecgh/go-spew/spew"
|
||||||
"github.com/go-errors/errors"
|
"github.com/go-errors/errors"
|
||||||
"github.com/lightningnetwork/lnd/chainntnfs"
|
"github.com/lightningnetwork/lnd/chainntnfs"
|
||||||
"github.com/lightningnetwork/lnd/channeldb"
|
"github.com/lightningnetwork/lnd/channeldb"
|
||||||
|
@ -140,8 +141,9 @@ type ChannelLinkConfig struct {
|
||||||
|
|
||||||
// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
|
// DecodeHopIterators facilitates batched decoding of HTLC Sphinx onion
|
||||||
// blobs, which are then used to inform how to forward an HTLC.
|
// blobs, which are then used to inform how to forward an HTLC.
|
||||||
// NOTE: This function assumes the same set of readers and preimages are
|
//
|
||||||
// always presented for the same identifier.
|
// NOTE: This function assumes the same set of readers and preimages
|
||||||
|
// are always presented for the same identifier.
|
||||||
DecodeHopIterators func([]byte, []DecodeHopIteratorRequest) (
|
DecodeHopIterators func([]byte, []DecodeHopIteratorRequest) (
|
||||||
[]DecodeHopIteratorResponse, error)
|
[]DecodeHopIteratorResponse, error)
|
||||||
|
|
||||||
|
@ -210,14 +212,15 @@ type ChannelLinkConfig struct {
|
||||||
|
|
||||||
// BatchTicker is the ticker that determines the interval that we'll
|
// BatchTicker is the ticker that determines the interval that we'll
|
||||||
// use to check the batch to see if there're any updates we should
|
// use to check the batch to see if there're any updates we should
|
||||||
// flush out. By batching updates into a single commit, we attempt
|
// flush out. By batching updates into a single commit, we attempt to
|
||||||
// to increase throughput by maximizing the number of updates
|
// increase throughput by maximizing the number of updates coalesced
|
||||||
// coalesced into a single commit.
|
// into a single commit.
|
||||||
BatchTicker Ticker
|
BatchTicker Ticker
|
||||||
|
|
||||||
// FwdPkgGCTicker is the ticker determining the frequency at which
|
// FwdPkgGCTicker is the ticker determining the frequency at which
|
||||||
// garbage collection of forwarding packages occurs. We use a time-based
|
// garbage collection of forwarding packages occurs. We use a
|
||||||
// approach, as opposed to block epochs, as to not hinder syncing.
|
// time-based approach, as opposed to block epochs, as to not hinder
|
||||||
|
// syncing.
|
||||||
FwdPkgGCTicker Ticker
|
FwdPkgGCTicker Ticker
|
||||||
|
|
||||||
// BatchSize is the max size of a batch of updates done to the link
|
// BatchSize is the max size of a batch of updates done to the link
|
||||||
|
@ -232,8 +235,8 @@ type ChannelLinkConfig struct {
|
||||||
}
|
}
|
||||||
|
|
||||||
// channelLink is the service which drives a channel's commitment update
|
// channelLink is the service which drives a channel's commitment update
|
||||||
// state-machine. In the event that an htlc needs to be propagated to another
|
// state-machine. In the event that an HTLC needs to be propagated to another
|
||||||
// link, the forward handler from config is used which sends htlc to the
|
// link, the forward handler from config is used which sends HTLC to the
|
||||||
// switch. Additionally, the link encapsulate logic of commitment protocol
|
// switch. Additionally, the link encapsulate logic of commitment protocol
|
||||||
// message ordering and updates.
|
// message ordering and updates.
|
||||||
type channelLink struct {
|
type channelLink struct {
|
||||||
|
@ -246,8 +249,8 @@ type channelLink struct {
|
||||||
// current number of settles that have been sent, but not yet committed
|
// current number of settles that have been sent, but not yet committed
|
||||||
// to the commitment.
|
// to the commitment.
|
||||||
//
|
//
|
||||||
// TODO(andrew.shvv) remove after we add additional
|
// TODO(andrew.shvv) remove after we add additional BatchNumber()
|
||||||
// BatchNumber() method in state machine.
|
// method in state machine.
|
||||||
batchCounter uint32
|
batchCounter uint32
|
||||||
|
|
||||||
// bestHeight is the best known height of the main chain. The link will
|
// bestHeight is the best known height of the main chain. The link will
|
||||||
|
@ -255,10 +258,22 @@ type channelLink struct {
|
||||||
bestHeight uint32
|
bestHeight uint32
|
||||||
|
|
||||||
// keystoneBatch represents a volatile list of keystones that must be
|
// keystoneBatch represents a volatile list of keystones that must be
|
||||||
// written before attempting to sign the next commitment txn.
|
// written before attempting to sign the next commitment txn. These
|
||||||
|
// represent all the HTLC's forwarded to the link from the switch. Once
|
||||||
|
// we lock them into our outgoing commitment, then the circuit has a
|
||||||
|
// keystone, and is fully opened.
|
||||||
keystoneBatch []Keystone
|
keystoneBatch []Keystone
|
||||||
|
|
||||||
|
// openedCircuits is the set of all payment circuits that will be open
|
||||||
|
// once we make our next commitment. After making the commitment we'll
|
||||||
|
// ACK all these from our mailbox to ensure that they don't get
|
||||||
|
// re-delivered if we reconnect.
|
||||||
openedCircuits []CircuitKey
|
openedCircuits []CircuitKey
|
||||||
|
|
||||||
|
// closedCircuits is the set of all payment circuits that will be
|
||||||
|
// closed once we make our next commitment. After taking the commitment
|
||||||
|
// we'll ACK all these to ensure that they don't get re-delivered if we
|
||||||
|
// reconnect.
|
||||||
closedCircuits []CircuitKey
|
closedCircuits []CircuitKey
|
||||||
|
|
||||||
// channel is a lightning network channel to which we apply htlc
|
// channel is a lightning network channel to which we apply htlc
|
||||||
|
@ -526,9 +541,9 @@ func (l *channelLink) syncChanStates() error {
|
||||||
closedCircuits []CircuitKey
|
closedCircuits []CircuitKey
|
||||||
)
|
)
|
||||||
|
|
||||||
// We've just received a ChnSync message from the remote party,
|
// We've just received a ChanSync message from the remote
|
||||||
// so we'll process the message in order to determine if we
|
// party, so we'll process the message in order to determine
|
||||||
// need to re-transmit any messages to the remote party.
|
// if we need to re-transmit any messages to the remote party.
|
||||||
msgsToReSend, openedCircuits, closedCircuits, err =
|
msgsToReSend, openedCircuits, closedCircuits, err =
|
||||||
l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
|
l.channel.ProcessChanSyncMsg(remoteChanSyncMsg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -539,7 +554,8 @@ func (l *channelLink) syncChanStates() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Repopulate any identifiers for circuits that may have been
|
// Repopulate any identifiers for circuits that may have been
|
||||||
// opened or unclosed.
|
// opened or unclosed. This may happen if we needed to
|
||||||
|
// retransmit a commitment signature message.
|
||||||
l.openedCircuits = openedCircuits
|
l.openedCircuits = openedCircuits
|
||||||
l.closedCircuits = closedCircuits
|
l.closedCircuits = closedCircuits
|
||||||
|
|
||||||
|
@ -574,10 +590,10 @@ func (l *channelLink) syncChanStates() error {
|
||||||
}
|
}
|
||||||
|
|
||||||
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
|
// resolveFwdPkgs loads any forwarding packages for this link from disk, and
|
||||||
// reprocesses them in order. The primary goal is to make sure that any HTLCs we
|
// reprocesses them in order. The primary goal is to make sure that any HTLCs
|
||||||
// previously received are reinstated in memory, and forwarded to the switch if
|
// we previously received are reinstated in memory, and forwarded to the switch
|
||||||
// necessary. After a restart, this will also delete any previously completed
|
// if necessary. After a restart, this will also delete any previously
|
||||||
// packages.
|
// completed packages.
|
||||||
func (l *channelLink) resolveFwdPkgs() error {
|
func (l *channelLink) resolveFwdPkgs() error {
|
||||||
fwdPkgs, err := l.channel.LoadFwdPkgs()
|
fwdPkgs, err := l.channel.LoadFwdPkgs()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -623,10 +639,10 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise this is either a new package or one has gone through
|
// Otherwise this is either a new package or one has gone through
|
||||||
// processing, but contains htlcs that need to be restored in memory. We
|
// processing, but contains htlcs that need to be restored in memory.
|
||||||
// replay this forwarding package to make sure our local mem state is
|
// We replay this forwarding package to make sure our local mem state
|
||||||
// resurrected, we mimic any original responses back to the remote
|
// is resurrected, we mimic any original responses back to the remote
|
||||||
// party, and reforward the relevant HTLCs to the switch.
|
// party, and re-forward the relevant HTLCs to the switch.
|
||||||
|
|
||||||
// If the package is fully acked but not completed, it must still have
|
// If the package is fully acked but not completed, it must still have
|
||||||
// settles and fails to propagate.
|
// settles and fails to propagate.
|
||||||
|
@ -637,10 +653,10 @@ func (l *channelLink) resolveFwdPkg(fwdPkg *channeldb.FwdPkg) (bool, error) {
|
||||||
l.processRemoteSettleFails(fwdPkg, settleFails)
|
l.processRemoteSettleFails(fwdPkg, settleFails)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Finally, replay *ALL ADDS* in this forwarding package. The downstream
|
// Finally, replay *ALL ADDS* in this forwarding package. The
|
||||||
// logic is able to filter out any duplicates, but we must shove the
|
// downstream logic is able to filter out any duplicates, but we must
|
||||||
// entire, original set of adds down the pipeline so that the batch of
|
// shove the entire, original set of adds down the pipeline so that the
|
||||||
// adds presented to the sphinx router does not ever change.
|
// batch of adds presented to the sphinx router does not ever change.
|
||||||
var needUpdate bool
|
var needUpdate bool
|
||||||
if !fwdPkg.AckFilter.IsFull() {
|
if !fwdPkg.AckFilter.IsFull() {
|
||||||
adds := lnwallet.PayDescsFromRemoteLogUpdates(
|
adds := lnwallet.PayDescsFromRemoteLogUpdates(
|
||||||
|
@ -717,10 +733,10 @@ func (l *channelLink) htlcManager() {
|
||||||
// commitment txn. We use the next local htlc index as the cut off
|
// commitment txn. We use the next local htlc index as the cut off
|
||||||
// point, since all indexes below that are committed.
|
// point, since all indexes below that are committed.
|
||||||
//
|
//
|
||||||
// NOTE: This is automatically done by the switch when it starts up, but
|
// NOTE: This is automatically done by the switch when it starts up,
|
||||||
// is necessary to prevent inconsistencies in the case that the link
|
// but is necessary to prevent inconsistencies in the case that the
|
||||||
// flaps. This is a result of a link's life-cycle being shorter than
|
// link flaps. This is a result of a link's life-cycle being shorter
|
||||||
// that of the switch.
|
// than that of the switch.
|
||||||
localHtlcIndex := l.channel.LocalHtlcIndex()
|
localHtlcIndex := l.channel.LocalHtlcIndex()
|
||||||
err := l.cfg.Circuits.TrimOpenCircuits(l.ShortChanID(), localHtlcIndex)
|
err := l.cfg.Circuits.TrimOpenCircuits(l.ShortChanID(), localHtlcIndex)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -737,7 +753,6 @@ func (l *channelLink) htlcManager() {
|
||||||
// re-synchronize state with the remote peer. settledHtlcs is a map of
|
// re-synchronize state with the remote peer. settledHtlcs is a map of
|
||||||
// HTLC's that we re-settled as part of the channel state sync.
|
// HTLC's that we re-settled as part of the channel state sync.
|
||||||
if l.cfg.SyncStates {
|
if l.cfg.SyncStates {
|
||||||
// TODO(roasbeef): need to ensure haven't already settled?
|
|
||||||
if err := l.syncChanStates(); err != nil {
|
if err := l.syncChanStates(); err != nil {
|
||||||
l.errorf("unable to synchronize channel states: %v", err)
|
l.errorf("unable to synchronize channel states: %v", err)
|
||||||
l.fail(err.Error())
|
l.fail(err.Error())
|
||||||
|
@ -745,9 +760,9 @@ func (l *channelLink) htlcManager() {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// With the channel states synced, we now reset the mailbox to ensure we
|
// With the channel states synced, we now reset the mailbox to ensure
|
||||||
// start processing all unacked packets in order. This is done here to
|
// we start processing all unacked packets in order. This is done here
|
||||||
// ensure that all acknowledgments that occur during channel
|
// to ensure that all acknowledgments that occur during channel
|
||||||
// resynchronization have taken affect, causing us only to pull unacked
|
// resynchronization have taken affect, causing us only to pull unacked
|
||||||
// packets after starting to read from the downstream mailbox.
|
// packets after starting to read from the downstream mailbox.
|
||||||
l.mailBox.ResetPackets()
|
l.mailBox.ResetPackets()
|
||||||
|
@ -827,7 +842,8 @@ out:
|
||||||
// TODO(roasbeef): remove all together
|
// TODO(roasbeef): remove all together
|
||||||
go func() {
|
go func() {
|
||||||
chanPoint := l.channel.ChannelPoint()
|
chanPoint := l.channel.ChannelPoint()
|
||||||
if err := l.cfg.Peer.WipeChannel(chanPoint); err != nil {
|
err := l.cfg.Peer.WipeChannel(chanPoint)
|
||||||
|
if err != nil {
|
||||||
log.Errorf("unable to wipe channel %v", err)
|
log.Errorf("unable to wipe channel %v", err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -895,6 +911,7 @@ out:
|
||||||
l.overflowQueue.AddPkt(pkt)
|
l.overflowQueue.AddPkt(pkt)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
l.handleDownStreamPkt(pkt, false)
|
l.handleDownStreamPkt(pkt, false)
|
||||||
|
|
||||||
// A message from the connected peer was just received. This
|
// A message from the connected peer was just received. This
|
||||||
|
@ -956,7 +973,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
||||||
// The channels spare bandwidth is fully allocated, so
|
// The channels spare bandwidth is fully allocated, so
|
||||||
// we'll put this HTLC into the overflow queue.
|
// we'll put this HTLC into the overflow queue.
|
||||||
case lnwallet.ErrMaxHTLCNumber:
|
case lnwallet.ErrMaxHTLCNumber:
|
||||||
log.Infof("Downstream htlc add update with "+
|
l.infof("Downstream htlc add update with "+
|
||||||
"payment hash(%x) have been added to "+
|
"payment hash(%x) have been added to "+
|
||||||
"reprocessing queue, batch: %v",
|
"reprocessing queue, batch: %v",
|
||||||
htlc.PaymentHash[:],
|
htlc.PaymentHash[:],
|
||||||
|
@ -969,7 +986,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
||||||
// machine, as a result, we'll signal the switch to
|
// machine, as a result, we'll signal the switch to
|
||||||
// cancel the pending payment.
|
// cancel the pending payment.
|
||||||
default:
|
default:
|
||||||
log.Warnf("Unable to handle downstream add HTLC: %v", err)
|
l.warnf("Unable to handle downstream add HTLC: %v", err)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
localFailure = false
|
localFailure = false
|
||||||
|
@ -984,7 +1001,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
err := lnwire.EncodeFailure(&b, failure, 0)
|
err := lnwire.EncodeFailure(&b, failure, 0)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to encode failure: %v", err)
|
l.errorf("unable to encode failure: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
reason = lnwire.OpaqueReason(b.Bytes())
|
reason = lnwire.OpaqueReason(b.Bytes())
|
||||||
|
@ -993,7 +1010,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
||||||
var err error
|
var err error
|
||||||
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
|
reason, err = pkt.obfuscator.EncryptFirstHop(failure)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to obfuscate error: %v", err)
|
l.errorf("unable to obfuscate error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1027,7 +1044,7 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Tracef("Received downstream htlc: payment_hash=%x, "+
|
l.tracef("Received downstream htlc: payment_hash=%x, "+
|
||||||
"local_log_index=%v, batch_size=%v",
|
"local_log_index=%v, batch_size=%v",
|
||||||
htlc.PaymentHash[:], index, l.batchCounter+1)
|
htlc.PaymentHash[:], index, l.batchCounter+1)
|
||||||
|
|
||||||
|
@ -1047,7 +1064,6 @@ func (l *channelLink) handleDownStreamPkt(pkt *htlcPacket, isReProcess bool) {
|
||||||
// An HTLC we forward to the switch has just settled somewhere
|
// An HTLC we forward to the switch has just settled somewhere
|
||||||
// upstream. Therefore we settle the HTLC within the our local
|
// upstream. Therefore we settle the HTLC within the our local
|
||||||
// state machine.
|
// state machine.
|
||||||
|
|
||||||
closedCircuitRef := pkt.inKey()
|
closedCircuitRef := pkt.inKey()
|
||||||
if err := l.channel.SettleHTLC(
|
if err := l.channel.SettleHTLC(
|
||||||
htlc.PaymentPreimage,
|
htlc.PaymentPreimage,
|
||||||
|
@ -1138,7 +1154,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Tracef("Receive upstream htlc with payment hash(%x), "+
|
l.tracef("Receive upstream htlc with payment hash(%x), "+
|
||||||
"assigning index: %v", msg.PaymentHash[:], index)
|
"assigning index: %v", msg.PaymentHash[:], index)
|
||||||
|
|
||||||
case *lnwire.UpdateFulfillHTLC:
|
case *lnwire.UpdateFulfillHTLC:
|
||||||
|
@ -1159,7 +1175,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||||
go func() {
|
go func() {
|
||||||
err := l.cfg.PreimageCache.AddPreimage(pre[:])
|
err := l.cfg.PreimageCache.AddPreimage(pre[:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("unable to add preimage=%x to "+
|
l.errorf("unable to add preimage=%x to "+
|
||||||
"cache", pre[:])
|
"cache", pre[:])
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -1191,7 +1207,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||||
// form.
|
// form.
|
||||||
var b bytes.Buffer
|
var b bytes.Buffer
|
||||||
if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
|
if err := lnwire.EncodeFailure(&b, failure, 0); err != nil {
|
||||||
log.Errorf("unable to encode malformed error: %v", err)
|
l.errorf("unable to encode malformed error: %v", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1330,12 +1346,12 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// ackDownStreamPackets is responsible for removing htlcs from a link's
|
// ackDownStreamPackets is responsible for removing htlcs from a link's mailbox
|
||||||
// mailbox for packets delivered from server, and cleaning up any circuits
|
// for packets delivered from server, and cleaning up any circuits closed by
|
||||||
// closed by signing a previous commitment txn. This method ensures that the
|
// signing a previous commitment txn. This method ensures that the circuits are
|
||||||
// circuits are removed from the circuit map before removing them from the
|
// removed from the circuit map before removing them from the link's mailbox,
|
||||||
// link's mailbox, otherwise it could be possible for some circuit to be missed
|
// otherwise it could be possible for some circuit to be missed if this link
|
||||||
// if this link flaps.
|
// flaps.
|
||||||
//
|
//
|
||||||
// The `forgive` flag allows this method to tolerate restarts, and ignores
|
// The `forgive` flag allows this method to tolerate restarts, and ignores
|
||||||
// errors that could be caused by a previous circuit deletion. Under normal
|
// errors that could be caused by a previous circuit deletion. Under normal
|
||||||
|
@ -1346,15 +1362,15 @@ func (l *channelLink) ackDownStreamPackets(forgive bool) error {
|
||||||
// previous commitment signature. This will prevent the Adds from being
|
// previous commitment signature. This will prevent the Adds from being
|
||||||
// replayed if this link disconnects.
|
// replayed if this link disconnects.
|
||||||
for _, inKey := range l.openedCircuits {
|
for _, inKey := range l.openedCircuits {
|
||||||
// In order to test the sphinx replay logic of the remote party,
|
// In order to test the sphinx replay logic of the remote
|
||||||
// unsafe replay does not acknowledge the packets from the
|
// party, unsafe replay does not acknowledge the packets from
|
||||||
// mailbox. We can then force a replay of any Add packets held
|
// the mailbox. We can then force a replay of any Add packets
|
||||||
// in memory by disconnecting and reconnecting the link.
|
// held in memory by disconnecting and reconnecting the link.
|
||||||
if l.cfg.UnsafeReplay {
|
if l.cfg.UnsafeReplay {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
l.debugf("Removing Add packet %s from mailbox", inKey)
|
l.debugf("removing Add packet %s from mailbox", inKey)
|
||||||
l.mailBox.AckPacket(inKey)
|
l.mailBox.AckPacket(inKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1370,11 +1386,11 @@ func (l *channelLink) ackDownStreamPackets(forgive bool) error {
|
||||||
case ErrUnknownCircuit:
|
case ErrUnknownCircuit:
|
||||||
if forgive {
|
if forgive {
|
||||||
// After a restart, we may have already removed this
|
// After a restart, we may have already removed this
|
||||||
// circuit. Since it shouldn't be possible for a circuit
|
// circuit. Since it shouldn't be possible for a
|
||||||
// to be closed by different htlcs, we assume this error
|
// circuit to be closed by different htlcs, we assume
|
||||||
// signals that the whole batch was successfully
|
// this error signals that the whole batch was
|
||||||
// removed.
|
// successfully removed.
|
||||||
l.warnf("Forgiving unknown circuit error after " +
|
l.warnf("forgiving unknown circuit error after " +
|
||||||
"attempting deletion, circuit was probably " +
|
"attempting deletion, circuit was probably " +
|
||||||
"removed before shutting down.")
|
"removed before shutting down.")
|
||||||
break
|
break
|
||||||
|
@ -1392,9 +1408,9 @@ func (l *channelLink) ackDownStreamPackets(forgive bool) error {
|
||||||
// Settle/Fails in the mailbox to ensure they do not get redelivered
|
// Settle/Fails in the mailbox to ensure they do not get redelivered
|
||||||
// after startup. If forgive is enabled and we've reached this point,
|
// after startup. If forgive is enabled and we've reached this point,
|
||||||
// the circuits must have been removed at some point, so it is now safe
|
// the circuits must have been removed at some point, so it is now safe
|
||||||
// to unqueue the corresponding Settle/Fails.
|
// to un-queue the corresponding Settle/Fails.
|
||||||
for _, inKey := range l.closedCircuits {
|
for _, inKey := range l.closedCircuits {
|
||||||
l.debugf("Removing Fail/Settle packet %s from mailbox", inKey)
|
l.debugf("removing Fail/Settle packet %s from mailbox", inKey)
|
||||||
l.mailBox.AckPacket(inKey)
|
l.mailBox.AckPacket(inKey)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1526,24 +1542,29 @@ type getBandwidthCmd struct {
|
||||||
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
|
func (l *channelLink) Bandwidth() lnwire.MilliSatoshi {
|
||||||
channelBandwidth := l.channel.AvailableBalance()
|
channelBandwidth := l.channel.AvailableBalance()
|
||||||
overflowBandwidth := l.overflowQueue.TotalHtlcAmount()
|
overflowBandwidth := l.overflowQueue.TotalHtlcAmount()
|
||||||
linkBandwidth := channelBandwidth - overflowBandwidth
|
|
||||||
reserve := lnwire.NewMSatFromSatoshis(l.channel.LocalChanReserve())
|
|
||||||
|
|
||||||
// If the channel reserve is greater than the total available
|
// To compute the total bandwidth, we'll take the current available
|
||||||
// balance of the link, just return 0.
|
// bandwidth, then subtract the overflow bandwidth as we'll eventually
|
||||||
|
// also need to evaluate those HTLC's once space on the commitment
|
||||||
|
// transaction is free.
|
||||||
|
linkBandwidth := channelBandwidth - overflowBandwidth
|
||||||
|
|
||||||
|
// If the channel reserve is greater than the total available balance
|
||||||
|
// of the link, just return 0.
|
||||||
|
reserve := lnwire.NewMSatFromSatoshis(l.channel.LocalChanReserve())
|
||||||
if linkBandwidth < reserve {
|
if linkBandwidth < reserve {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
// Else the amount that is available to flow through the link at
|
// Else the amount that is available to flow through the link at this
|
||||||
// this point is the available balance minus the reserve amount
|
// point is the available balance minus the reserve amount we are
|
||||||
// we are required to keep as collateral.
|
// required to keep as collateral.
|
||||||
return linkBandwidth - reserve
|
return linkBandwidth - reserve
|
||||||
}
|
}
|
||||||
|
|
||||||
// AttachMailBox updates the current mailbox used by this link, and hooks up the
|
// AttachMailBox updates the current mailbox used by this link, and hooks up
|
||||||
// mailbox's message and packet outboxes to the link's upstream and downstream
|
// the mailbox's message and packet outboxes to the link's upstream and
|
||||||
// chans, respectively.
|
// downstream chans, respectively.
|
||||||
func (l *channelLink) AttachMailBox(mailbox MailBox) {
|
func (l *channelLink) AttachMailBox(mailbox MailBox) {
|
||||||
l.Lock()
|
l.Lock()
|
||||||
l.mailBox = mailbox
|
l.mailBox = mailbox
|
||||||
|
@ -1611,6 +1632,7 @@ func (l *channelLink) String() string {
|
||||||
func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error {
|
func (l *channelLink) HandleSwitchPacket(pkt *htlcPacket) error {
|
||||||
l.tracef("received switch packet inkey=%v, outkey=%v",
|
l.tracef("received switch packet inkey=%v, outkey=%v",
|
||||||
pkt.inKey(), pkt.outKey())
|
pkt.inKey(), pkt.outKey())
|
||||||
|
|
||||||
l.mailBox.AddPacket(pkt)
|
l.mailBox.AddPacket(pkt)
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -1669,12 +1691,16 @@ func (l *channelLink) processRemoteSettleFails(fwdPkg *channeldb.FwdPkg,
|
||||||
|
|
||||||
var switchPackets []*htlcPacket
|
var switchPackets []*htlcPacket
|
||||||
for i, pd := range settleFails {
|
for i, pd := range settleFails {
|
||||||
// Skip any settles or fails that have already been acknowledged
|
// Skip any settles or fails that have already been
|
||||||
// by the incoming link that originated the forwarded Add.
|
// acknowledged by the incoming link that originated the
|
||||||
|
// forwarded Add.
|
||||||
if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
|
if fwdPkg.SettleFailFilter.Contains(uint16(i)) {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(roasbeef): rework log entries to a shared
|
||||||
|
// interface.
|
||||||
|
|
||||||
switch pd.EntryType {
|
switch pd.EntryType {
|
||||||
|
|
||||||
// A settle for an HTLC we previously forwarded HTLC has been
|
// A settle for an HTLC we previously forwarded HTLC has been
|
||||||
|
@ -1759,8 +1785,8 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
||||||
|
|
||||||
// Atomically decode the incoming htlcs, simultaneously checking for
|
// Atomically decode the incoming htlcs, simultaneously checking for
|
||||||
// replay attempts. A particular index in the returned, spare list of
|
// replay attempts. A particular index in the returned, spare list of
|
||||||
// channel iterators should only be used if the failure code at the same
|
// channel iterators should only be used if the failure code at the
|
||||||
// index is lnwire.FailCodeNone.
|
// same index is lnwire.FailCodeNone.
|
||||||
decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
|
decodeResps, sphinxErr := l.cfg.DecodeHopIterators(
|
||||||
fwdPkg.ID(), decodeReqs,
|
fwdPkg.ID(), decodeReqs,
|
||||||
)
|
)
|
||||||
|
@ -1781,11 +1807,11 @@ func (l *channelLink) processRemoteAdds(fwdPkg *channeldb.FwdPkg,
|
||||||
if fwdPkg.State == channeldb.FwdStateProcessed &&
|
if fwdPkg.State == channeldb.FwdStateProcessed &&
|
||||||
fwdPkg.AckFilter.Contains(idx) {
|
fwdPkg.AckFilter.Contains(idx) {
|
||||||
|
|
||||||
// If this index is already found in the ack filter, the
|
// If this index is already found in the ack filter,
|
||||||
// response to this forwarding decision has already been
|
// the response to this forwarding decision has already
|
||||||
// committed by one of our commitment txns. ADDs in this
|
// been committed by one of our commitment txns. ADDs
|
||||||
// state are waiting for the rest of the fwding package
|
// in this state are waiting for the rest of the fwding
|
||||||
// to get acked before being garbage collected.
|
// package to get acked before being garbage collected.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2313,14 +2339,14 @@ func (l *channelLink) forwardBatch(packets ...*htlcPacket) {
|
||||||
l.handleBatchFwdErrs(errChan)
|
l.handleBatchFwdErrs(errChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
// handleBatchFwdErrs waits on the given errChan until it is closed, logging the
|
// handleBatchFwdErrs waits on the given errChan until it is closed, logging
|
||||||
// errors returned from any unsuccessful forwarding attempts.
|
// the errors returned from any unsuccessful forwarding attempts.
|
||||||
func (l *channelLink) handleBatchFwdErrs(errChan chan error) {
|
func (l *channelLink) handleBatchFwdErrs(errChan chan error) {
|
||||||
for {
|
for {
|
||||||
err, ok := <-errChan
|
err, ok := <-errChan
|
||||||
if !ok {
|
if !ok {
|
||||||
// Err chan has been drained or switch is shutting down.
|
// Err chan has been drained or switch is shutting
|
||||||
// Either way, return.
|
// down. Either way, return.
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2335,9 +2361,8 @@ func (l *channelLink) handleBatchFwdErrs(errChan chan error) {
|
||||||
|
|
||||||
// sendHTLCError functions cancels HTLC and send cancel message back to the
|
// sendHTLCError functions cancels HTLC and send cancel message back to the
|
||||||
// peer from which HTLC was received.
|
// peer from which HTLC was received.
|
||||||
func (l *channelLink) sendHTLCError(htlcIndex uint64,
|
func (l *channelLink) sendHTLCError(htlcIndex uint64, failure lnwire.FailureMessage,
|
||||||
failure lnwire.FailureMessage, e ErrorEncrypter,
|
e ErrorEncrypter, sourceRef *channeldb.AddRef) {
|
||||||
sourceRef *channeldb.AddRef) {
|
|
||||||
|
|
||||||
reason, err := e.EncryptFirstHop(failure)
|
reason, err := e.EncryptFirstHop(failure)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -24,3 +24,19 @@ func DisableLog() {
|
||||||
func UseLogger(logger btclog.Logger) {
|
func UseLogger(logger btclog.Logger) {
|
||||||
log = logger
|
log = logger
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// logClosure is used to provide a closure over expensive logging operations so
|
||||||
|
// don't have to be performed when the logging level doesn't warrant it.
|
||||||
|
type logClosure func() string
|
||||||
|
|
||||||
|
// String invokes the underlying function and returns the result.
|
||||||
|
func (c logClosure) String() string {
|
||||||
|
return c()
|
||||||
|
}
|
||||||
|
|
||||||
|
// newLogClosure returns a new closure over a function that returns a string
|
||||||
|
// which itself provides a Stringer interface so that it can be used with the
|
||||||
|
// logging system.
|
||||||
|
func newLogClosure(c func() string) logClosure {
|
||||||
|
return logClosure(c)
|
||||||
|
}
|
||||||
|
|
|
@ -247,6 +247,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
||||||
select {
|
select {
|
||||||
case msgDone := <-m.msgReset:
|
case msgDone := <-m.msgReset:
|
||||||
m.wireMessages.Init()
|
m.wireMessages.Init()
|
||||||
|
|
||||||
close(msgDone)
|
close(msgDone)
|
||||||
case <-m.quit:
|
case <-m.quit:
|
||||||
m.wireCond.L.Unlock()
|
m.wireCond.L.Unlock()
|
||||||
|
@ -261,8 +262,13 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
||||||
m.pktCond.Wait()
|
m.pktCond.Wait()
|
||||||
|
|
||||||
select {
|
select {
|
||||||
|
// Resetting the packet queue means just moving
|
||||||
|
// our pointer to the front. This ensures that
|
||||||
|
// any un-ACK'd messages are re-delivered upon
|
||||||
|
// reconnect.
|
||||||
case pktDone := <-m.pktReset:
|
case pktDone := <-m.pktReset:
|
||||||
m.pktHead = m.htlcPkts.Front()
|
m.pktHead = m.htlcPkts.Front()
|
||||||
|
|
||||||
close(pktDone)
|
close(pktDone)
|
||||||
case <-m.quit:
|
case <-m.quit:
|
||||||
m.pktCond.L.Unlock()
|
m.pktCond.L.Unlock()
|
||||||
|
@ -272,17 +278,22 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Grab the datum off the front of the queue, shifting the
|
|
||||||
// slice's reference down one in order to remove the datum from
|
|
||||||
// the queue.
|
|
||||||
var (
|
var (
|
||||||
nextPkt *htlcPacket
|
nextPkt *htlcPacket
|
||||||
nextMsg lnwire.Message
|
nextMsg lnwire.Message
|
||||||
)
|
)
|
||||||
switch cType {
|
switch cType {
|
||||||
|
// Grab the datum off the front of the queue, shifting the
|
||||||
|
// slice's reference down one in order to remove the datum from
|
||||||
|
// the queue.
|
||||||
case wireCourier:
|
case wireCourier:
|
||||||
entry := m.wireMessages.Front()
|
entry := m.wireMessages.Front()
|
||||||
nextMsg = m.wireMessages.Remove(entry).(lnwire.Message)
|
nextMsg = m.wireMessages.Remove(entry).(lnwire.Message)
|
||||||
|
|
||||||
|
// For packets, we actually never remove an item until it has
|
||||||
|
// been ACK'd by the link. This ensures that if a read packet
|
||||||
|
// doesn't make it into a commitment, then it'll be
|
||||||
|
// re-delivered once the link comes back online.
|
||||||
case pktCourier:
|
case pktCourier:
|
||||||
nextPkt = m.pktHead.Value.(*htlcPacket)
|
nextPkt = m.pktHead.Value.(*htlcPacket)
|
||||||
m.pktHead = m.pktHead.Next()
|
m.pktHead = m.pktHead.Next()
|
||||||
|
@ -297,6 +308,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
||||||
m.pktCond.L.Unlock()
|
m.pktCond.L.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// With the next message obtained, we'll now select to attempt
|
||||||
// to deliver the message. If we receive a kill signal, then
|
// to deliver the message. If we receive a kill signal, then
|
||||||
// we'll bail out.
|
// we'll bail out.
|
||||||
switch cType {
|
switch cType {
|
||||||
|
@ -307,6 +319,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
||||||
m.wireCond.L.Lock()
|
m.wireCond.L.Lock()
|
||||||
m.wireMessages.Init()
|
m.wireMessages.Init()
|
||||||
m.wireCond.L.Unlock()
|
m.wireCond.L.Unlock()
|
||||||
|
|
||||||
close(msgDone)
|
close(msgDone)
|
||||||
case <-m.quit:
|
case <-m.quit:
|
||||||
return
|
return
|
||||||
|
@ -319,6 +332,7 @@ func (m *memoryMailBox) mailCourier(cType courierType) {
|
||||||
m.pktCond.L.Lock()
|
m.pktCond.L.Lock()
|
||||||
m.pktHead = m.htlcPkts.Front()
|
m.pktHead = m.htlcPkts.Front()
|
||||||
m.pktCond.L.Unlock()
|
m.pktCond.L.Unlock()
|
||||||
|
|
||||||
close(pktDone)
|
close(pktDone)
|
||||||
case <-m.quit:
|
case <-m.quit:
|
||||||
return
|
return
|
||||||
|
|
|
@ -346,8 +346,8 @@ func (s *Switch) SendHTLC(nextNode [33]byte, htlc *lnwire.UpdateAddHTLC,
|
||||||
"while waiting for payment result")
|
"while waiting for payment result")
|
||||||
}
|
}
|
||||||
|
|
||||||
// Remove circuit since we are about to complete an
|
// Remove circuit since we are about to complete an add/fail of this
|
||||||
// add/fail of this HTLC.
|
// HTLC.
|
||||||
if teardownErr := s.teardownCircuit(response); teardownErr != nil {
|
if teardownErr := s.teardownCircuit(response); teardownErr != nil {
|
||||||
log.Warnf("unable to teardown circuit %s: %v",
|
log.Warnf("unable to teardown circuit %s: %v",
|
||||||
response.inKey(), teardownErr)
|
response.inKey(), teardownErr)
|
||||||
|
@ -409,7 +409,7 @@ type updatePoliciesCmd struct {
|
||||||
// updateLinkPolicies attempts to update the forwarding policies for the set of
|
// updateLinkPolicies attempts to update the forwarding policies for the set of
|
||||||
// passed links identified by their channel points. If a nil set of channel
|
// passed links identified by their channel points. If a nil set of channel
|
||||||
// points is passed, then the forwarding policies for all active links will be
|
// points is passed, then the forwarding policies for all active links will be
|
||||||
// updated.k
|
// updated.
|
||||||
func (s *Switch) updateLinkPolicies(c *updatePoliciesCmd) error {
|
func (s *Switch) updateLinkPolicies(c *updatePoliciesCmd) error {
|
||||||
log.Debugf("Updating link policies: %v", spew.Sdump(c))
|
log.Debugf("Updating link policies: %v", spew.Sdump(c))
|
||||||
|
|
||||||
|
@ -440,9 +440,9 @@ func (s *Switch) updateLinkPolicies(c *updatePoliciesCmd) error {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// forward is used in order to find next channel link and apply htlc
|
// forward is used in order to find next channel link and apply htlc update.
|
||||||
// update. Also this function is used by channel links itself in order to
|
// Also this function is used by channel links itself in order to forward the
|
||||||
// forward the update after it has been included in the channel.
|
// update after it has been included in the channel.
|
||||||
func (s *Switch) forward(packet *htlcPacket) error {
|
func (s *Switch) forward(packet *htlcPacket) error {
|
||||||
switch htlc := packet.htlc.(type) {
|
switch htlc := packet.htlc.(type) {
|
||||||
case *lnwire.UpdateAddHTLC:
|
case *lnwire.UpdateAddHTLC:
|
||||||
|
@ -475,10 +475,11 @@ func (s *Switch) forward(packet *htlcPacket) error {
|
||||||
return s.route(packet)
|
return s.route(packet)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ForwardPackets adds a list of packets to the switch for processing. Fails and
|
// ForwardPackets adds a list of packets to the switch for processing. Fails
|
||||||
// settles are added on a first past, simultaneously constructing circuits for
|
// and settles are added on a first past, simultaneously constructing circuits
|
||||||
// any adds. After persisting the circuits, another pass of the adds is given to
|
// for any adds. After persisting the circuits, another pass of the adds is
|
||||||
// forward them through the router.
|
// given to forward them through the router.
|
||||||
|
//
|
||||||
// NOTE: This method guarantees that the returned err chan will eventually be
|
// NOTE: This method guarantees that the returned err chan will eventually be
|
||||||
// closed. The receiver should read on the channel until receiving such a
|
// closed. The receiver should read on the channel until receiving such a
|
||||||
// signal.
|
// signal.
|
||||||
|
@ -911,7 +912,6 @@ func (s *Switch) handlePacketForward(packet *htlcPacket) error {
|
||||||
return destination.HandleSwitchPacket(packet)
|
return destination.HandleSwitchPacket(packet)
|
||||||
|
|
||||||
case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC:
|
case *lnwire.UpdateFailHTLC, *lnwire.UpdateFulfillHTLC:
|
||||||
|
|
||||||
// If the source of this packet has not been set, use the
|
// If the source of this packet has not been set, use the
|
||||||
// circuit map to lookup the origin.
|
// circuit map to lookup the origin.
|
||||||
circuit, err := s.closeCircuit(packet)
|
circuit, err := s.closeCircuit(packet)
|
||||||
|
@ -1032,10 +1032,9 @@ func (s *Switch) failAddPacket(packet *htlcPacket,
|
||||||
// set the incoming chan and htlc ID of the given packet if the source was
|
// set the incoming chan and htlc ID of the given packet if the source was
|
||||||
// found, and will properly [re]encrypt any failure messages.
|
// found, and will properly [re]encrypt any failure messages.
|
||||||
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
|
func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
|
||||||
|
// If the packet has its source, that means it was failed locally by
|
||||||
// If the packet has its source, that means it was failed locally by the
|
// the outgoing link. We fail it here to make sure only one response
|
||||||
// outgoing link. We fail it here to make sure only one response makes
|
// makes it through the switch.
|
||||||
// it through the switch.
|
|
||||||
if pkt.hasSource {
|
if pkt.hasSource {
|
||||||
circuit, err := s.circuits.FailCircuit(pkt.inKey())
|
circuit, err := s.circuits.FailCircuit(pkt.inKey())
|
||||||
switch err {
|
switch err {
|
||||||
|
@ -1044,15 +1043,16 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
|
||||||
case nil:
|
case nil:
|
||||||
return circuit, nil
|
return circuit, nil
|
||||||
|
|
||||||
// Circuit was previously closed, but has not been deleted. We'll just
|
// Circuit was previously closed, but has not been deleted.
|
||||||
// drop this response until the circuit has been fully removed.
|
// We'll just drop this response until the circuit has been
|
||||||
|
// fully removed.
|
||||||
case ErrCircuitClosing:
|
case ErrCircuitClosing:
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
||||||
// Failed to close circuit because it does not exist. This is likely
|
// Failed to close circuit because it does not exist. This is
|
||||||
// because the circuit was already successfully closed. Since
|
// likely because the circuit was already successfully closed.
|
||||||
// this packet failed locally, there is no forwarding package
|
// Since this packet failed locally, there is no forwarding
|
||||||
// entry to acknowledge.
|
// package entry to acknowledge.
|
||||||
case ErrUnknownCircuit:
|
case ErrUnknownCircuit:
|
||||||
return nil, err
|
return nil, err
|
||||||
|
|
||||||
|
@ -1062,8 +1062,8 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Otherwise, this is packet was received from the remote party.
|
// Otherwise, this is packet was received from the remote party. Use
|
||||||
// Use circuit map to find the incoming link to receive the settle/fail.
|
// circuit map to find the incoming link to receive the settle/fail.
|
||||||
circuit, err := s.circuits.CloseCircuit(pkt.outKey())
|
circuit, err := s.circuits.CloseCircuit(pkt.outKey())
|
||||||
switch err {
|
switch err {
|
||||||
|
|
||||||
|
@ -1074,6 +1074,16 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
|
||||||
pkt.circuit = circuit
|
pkt.circuit = circuit
|
||||||
pkt.sourceRef = &circuit.AddRef
|
pkt.sourceRef = &circuit.AddRef
|
||||||
|
|
||||||
|
pktType := "SETTLE"
|
||||||
|
if _, ok := pkt.htlc.(*lnwire.UpdateFailHTLC); ok {
|
||||||
|
pktType = "FAIL"
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Debugf("Closed completed %s circuit for %x: "+
|
||||||
|
"(%s, %d) <-> (%s, %d)", pktType, pkt.circuit.PaymentHash,
|
||||||
|
pkt.incomingChanID, pkt.incomingHTLCID,
|
||||||
|
pkt.outgoingChanID, pkt.outgoingHTLCID)
|
||||||
|
|
||||||
return circuit, nil
|
return circuit, nil
|
||||||
|
|
||||||
// Circuit was previously closed, but has not been deleted. We'll just
|
// Circuit was previously closed, but has not been deleted. We'll just
|
||||||
|
@ -1105,6 +1115,10 @@ func (s *Switch) closeCircuit(pkt *htlcPacket) (*PaymentCircuit, error) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ackSettleFail is used by the switch to ACK any settle/fail entries in the
|
||||||
|
// forwarding package of the outgoing link for a payment circuit. We do this if
|
||||||
|
// we're the originator of the payment, so the link stops attempting to
|
||||||
|
// re-broadcast.
|
||||||
func (s *Switch) ackSettleFail(settleFailRef channeldb.SettleFailRef) error {
|
func (s *Switch) ackSettleFail(settleFailRef channeldb.SettleFailRef) error {
|
||||||
return s.cfg.DB.Update(func(tx *bolt.Tx) error {
|
return s.cfg.DB.Update(func(tx *bolt.Tx) error {
|
||||||
return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRef)
|
return s.cfg.SwitchPackager.AckSettleFails(tx, settleFailRef)
|
||||||
|
@ -1149,6 +1163,7 @@ func (s *Switch) teardownCircuit(pkt *htlcPacket) error {
|
||||||
default:
|
default:
|
||||||
log.Debugf("Tearing down incomplete circuit with %s for inkey=%v",
|
log.Debugf("Tearing down incomplete circuit with %s for inkey=%v",
|
||||||
pktType, pkt.inKey())
|
pktType, pkt.inKey())
|
||||||
|
|
||||||
err := s.circuits.DeleteCircuits(pkt.inKey())
|
err := s.circuits.DeleteCircuits(pkt.inKey())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warnf("Failed to tear down pending %s circuit for %x: "+
|
log.Warnf("Failed to tear down pending %s circuit for %x: "+
|
||||||
|
|
Loading…
Reference in New Issue