lnd: implement update pipelining and htlc trickling+batching

This commit *significantly increases* the payment throughput per-core,
per-channel of the daemon.

With this commit updates are properly pipelined respecting the current
revocation window, htlc updates are batched, a timer is checked to push
chain convergence, and htlc update below the batch size are
periodically flushed to the remote chain.

The current pending update timer, trickle timer, and batch size have
been arbitrarily chosen based on my local tests. In the future these
parameters should be chosen to optimize response-time and throughput
after measurements are gathered.
This commit is contained in:
Olaoluwa Osuntokun 2016-07-21 17:10:30 -07:00
parent c0a28e3b7f
commit adb23a366f
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
1 changed files with 186 additions and 67 deletions

253
peer.go
View File

@ -766,22 +766,33 @@ type pendingPayment struct {
// commitment update state-machine. This struct is used by htlcManager's to
// save meta-state required for proper functioning.
type commitmentState struct {
// TODO(roasbeef): use once trickle+batch logic is in
pendingLogLen uint32
// htlcsToSettle is a list of preimages which allow us to settle one or
// many of the pending HTLC's we've received from the upstream peer.
// TODO(roasbeef): should send sig to settle once preimage is known.
htlcsToSettle [][32]byte
htlcsToSettle map[uint32][32]byte
// sigPending is a bool which indicates if we're currently awaiting a
// signature response to a commitment update we've initiated.
sigPending bool
// TODO(roasbeef): use once trickle+batch logic is in
pendingBatch []*pendingPayment
// clearedHTCLs is a map of outgoing HTLC's we've committed to in our
// chain which have not yet been settled by the upstream peer.
clearedHTCLs map[uint32]*pendingPayment
// numUnAcked is a counter tracking the number of unacked changes we've
// sent. A change is acked once we receive a new update to our local
// chain from the remote peer.
numUnAcked uint32
// logCommitTimer is a timer which is sent upon if we go an interval
// without receiving/sending a commitment update. It's role is to
// ensure both chains converge to identical state in a timely manner.
// TODO(roasbeef): timer should be >> then RTT
logCommitTimer <-chan time.Time
// switchChan is a channel used to send packets to the htlc switch for
// fowarding.
switchChan chan<- *htlcPacket
channel *lnwallet.LightningChannel
chanPoint *wire.OutPoint
}
@ -800,7 +811,7 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
upstreamLink <-chan lnwire.Message) {
chanStats := channel.StateSnapshot()
peerLog.Tracef("HTLC manager for ChannelPoint(%v) started, "+
peerLog.Infof("HTLC manager for ChannelPoint(%v) started, "+
"our_balance=%v, their_balance=%v, chain_height=%v",
channel.ChannelPoint(), chanStats.LocalBalance,
chanStats.RemoteBalance, chanStats.NumUpdates)
@ -817,13 +828,58 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
}
state := &commitmentState{
channel: channel,
chanPoint: channel.ChannelPoint(),
clearedHTCLs: make(map[uint32]*pendingPayment),
channel: channel,
chanPoint: channel.ChannelPoint(),
clearedHTCLs: make(map[uint32]*pendingPayment),
htlcsToSettle: make(map[uint32][32]byte),
switchChan: htlcPlex,
}
batchTimer := time.Tick(10 * time.Millisecond)
out:
for {
select {
// TODO(roasbeef): prevent leaking ticker?
case <-state.logCommitTimer:
// If we haven't sent or received a new commitment
// update in some time, check to see if we have any
// pending updates we need to commit. If so, then send
// an update incrementing the unacked coutner is
// succesful.
if !state.channel.PendingUpdates() {
continue
}
if sent, err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
p.Disconnect()
break out
} else if sent {
state.numUnAcked += 1
}
case <-batchTimer:
// If the current batch is empty, then we have no work
// here.
if len(state.pendingBatch) == 0 {
continue
}
// Otherwise, attempt to extend the remote commitment
// chain including all the currently pending entries.
// If the send was unsuccesful, then abaondon the
// update, waiting for the revocation window to open
// up.
if sent, err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
p.Disconnect()
break out
} else if !sent {
continue
}
state.numUnAcked += 1
case pkt := <-downstreamLink:
p.handleDownStreamPkt(state, pkt)
case msg, ok := <-upstreamLink:
@ -855,20 +911,29 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
// downstream channel, so we add the new HTLC
// to our local log, then update the commitment
// chains.
index := state.channel.AddHTLC(htlc, false)
index := state.channel.AddHTLC(htlc)
p.queueMsg(htlc, nil)
// TODO(roasbeef): batch trickle timer + cap
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
}
state.sigPending = true
state.clearedHTCLs[index] = &pendingPayment{
state.pendingBatch = append(state.pendingBatch, &pendingPayment{
htlc: htlc,
index: index,
err: pkt.err,
})
// If this newly added update exceeds the max batch size, the
// initiate an update.
// TODO(roasbeef): enforce max HTLC's in flight limit
if len(state.pendingBatch) >= 10 {
if sent, err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
p.Disconnect()
return
} else if !sent {
return
}
state.numUnAcked += 1
}
}
}
@ -879,11 +944,12 @@ func (p *peer) handleDownStreamPkt(state *commitmentState, pkt *htlcPacket) {
func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
switch htlcPkt := msg.(type) {
// TODO(roasbeef): timeouts
// * fail if can't parse sphinx mix-header
case *lnwire.HTLCAddRequest:
// We just received an add request from an upstream peer, so we
// add it to our state machine, then add the HTLC to our
// "settle" list in the event that we know the pre-image
state.channel.AddHTLC(htlcPkt, true)
index := state.channel.ReceiveHTLC(htlcPkt)
rHash := htlcPkt.RedemptionHashes[0]
if invoice, found := p.server.invoices.lookupInvoice(rHash); found {
@ -891,12 +957,13 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
// * onion layer strip should also be before invoice lookup
// * also can immediately send the settle msg
pre := invoice.paymentPreimage
state.htlcsToSettle = append(state.htlcsToSettle, pre)
state.htlcsToSettle[index] = pre
}
case *lnwire.HTLCSettleRequest:
// TODO(roasbeef): this assumes no "multi-sig"
pre := htlcPkt.RedemptionProofs[0]
if _, err := state.channel.SettleHTLC(pre, true); err != nil {
idx := uint32(htlcPkt.HTLCKey)
if err := state.channel.ReceiveHTLCSettle(pre, idx); err != nil {
// TODO(roasbeef): broadcast on-chain
peerLog.Errorf("settle for outgoing HTLC rejected: %v", err)
p.Disconnect()
@ -914,20 +981,16 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
return
}
// If we didn't initiate this state transition, then we'll
// update the remote commitment chain with a new commitment.
// Otherwise, we can reset the pending bit as we received the
// signature we were expecting.
// TODO(roasbeef): move sig updates to own trigger
// * can remove sigPending if so
if !state.sigPending {
if err := p.updateCommitTx(state); err != nil {
if state.numUnAcked > 0 {
state.numUnAcked -= 1
state.logCommitTimer = time.Tick(300 * time.Millisecond)
} else {
if _, err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
p.Disconnect()
return
}
} else {
state.sigPending = false
}
// Finally, since we just accepted a new state, send the remote
@ -948,75 +1011,90 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
p.Disconnect()
return
}
// TODO(roasbeef): send the locked-in HTLC's over the plex chan
// to the switch.
peerLog.Debugf("htlcs ready to forward: %v",
spew.Sdump(htlcsToForward))
// If any of the htlc's eligible for forwarding are pending
// settling or timeing out previous outgoing payments, then we
// can them from the pending set, and signal the requster (if
// existing) that the payment has been fully fulfilled.
numSettled := 0
for _, htlc := range htlcsToForward {
// Send this fully activated HTLC to the htlc switch to
// continue the chained clear/settle.
state.switchChan <- p.logEntryToHtlcPkt(htlc)
if p, ok := state.clearedHTCLs[htlc.ParentIndex]; ok {
peerLog.Debugf("local htlc %v cleared",
spew.Sdump(p.htlc))
p.err <- nil
delete(state.clearedHTCLs, htlc.ParentIndex)
}
}
// A full state transition has been completed, if we don't need
// to settle any HTLC's, then we're done.
if len(state.htlcsToSettle) == 0 {
return
}
// Otherwise, we have some pending HTLC's which we can pull
// funds from, thereby settling.
peerLog.Tracef("settling %v HTLC's", len(state.htlcsToSettle))
for _, pre := range state.htlcsToSettle {
// Add each HTLC settle update to the channel's state
// update log, also sending the log update to the
// remote party.
logIndex, err := state.channel.SettleHTLC(pre, false)
if err != nil {
peerLog.Errorf("unable to settle htlc: %v", err)
// TODO(roasbeef): rework log entries to a shared
// interface.
if htlc.EntryType != lnwallet.Add {
continue
}
// If we can't immediately settle this HTLC, then we
// can halt processing here.
preimage, ok := state.htlcsToSettle[htlc.Index]
if !ok {
continue
}
// Otherwise, we settle this HTLC within our local
// state update log, then send the update entry to the
// remote party.
logIndex, err := state.channel.SettleHTLC(preimage)
if err != nil {
peerLog.Errorf("unable to settle htlc: %v", err)
p.Disconnect()
continue
}
settleMsg := &lnwire.HTLCSettleRequest{
ChannelPoint: state.chanPoint,
HTLCKey: lnwire.HTLCKey(logIndex),
RedemptionProofs: [][32]byte{pre},
RedemptionProofs: [][32]byte{preimage},
}
p.queueMsg(settleMsg, nil)
delete(state.htlcsToSettle, htlc.Index)
numSettled++
}
if numSettled == 0 {
return
}
// With all the settle updates added to the local and remote
// HTLC logs, initiate a state transition by updating the
// remote commitment chain.
if err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update "+
"commitment: %v", err)
if sent, err := p.updateCommitTx(state); err != nil {
peerLog.Errorf("unable to update commitment: %v", err)
p.Disconnect()
return
} else if sent {
// TODO(roasbeef): wait to delete from htlcsToSettle?
state.numUnAcked += 1
}
state.sigPending = true
state.htlcsToSettle = nil
}
}
// updateCommitTx signs, then sends an update to the remote peer adding a new
// commitment to their commitment chain which includes all the latest updates
// we've received+processed up to this point.
func (p *peer) updateCommitTx(state *commitmentState) error {
func (p *peer) updateCommitTx(state *commitmentState) (bool, error) {
sigTheirs, logIndexTheirs, err := state.channel.SignNextCommitment()
if err != nil {
return fmt.Errorf("unable to sign next commitment: %v", err)
if err == lnwallet.ErrNoWindow {
peerLog.Tracef("revocation window exhausted, unable to send %v",
len(state.pendingBatch))
return false, nil
} else if err != nil {
return false, err
}
parsedSig, err := btcec.ParseSignature(sigTheirs, btcec.S256())
if err != nil {
return fmt.Errorf("unable to parse sig: %v", err)
return false, fmt.Errorf("unable to parse sig: %v", err)
}
commitSig := &lnwire.CommitSignature{
@ -1026,7 +1104,48 @@ func (p *peer) updateCommitTx(state *commitmentState) error {
}
p.queueMsg(commitSig, nil)
return nil
// Move all pending updates to the map of cleared HTLC's, clearing out
// the set of pending updates.
for _, update := range state.pendingBatch {
// TODO(roasbeef): add parsed next-hop info to pending batch
// for multi-hop forwarding
state.clearedHTCLs[update.index] = update
}
state.logCommitTimer = nil
state.pendingBatch = nil
return true, nil
}
// logEntryToHtlcPkt converts a particular Lightning Commitment Protocol (LCP)
// log entry the corresponding htlcPacket with src/dest set along with the
// proper wire message. This helepr method is provided in order to aide an
// htlcManager in forwarding packets to the htlcSwitch.
func (p *peer) logEntryToHtlcPkt(pd *lnwallet.PaymentDescriptor) *htlcPacket {
pkt := &htlcPacket{}
// TODO(roasbeef): alter after switch to log entry interface
var msg lnwire.Message
switch pd.EntryType {
case lnwallet.Add:
// TODO(roasbeef): timeout, onion blob, etc
msg = &lnwire.HTLCAddRequest{
Amount: lnwire.CreditsAmount(pd.Amount),
RedemptionHashes: [][32]byte{pd.RHash},
}
case lnwallet.Settle:
// TODO(roasbeef): thread through preimage
msg = &lnwire.HTLCSettleRequest{
HTLCKey: lnwire.HTLCKey(pd.ParentIndex),
}
}
// TODO(roasbeef): set dest via onion blob or state
pkt.amt = pd.Amount
pkt.msg = msg
pkt.src = p.lightningID
return pkt
}
// TODO(roasbeef): make all start/stop mutexes a CAS