htlcswitch: within link, with each new block, check to see if commit fee should change

In this commit we add a new case to the main select statement within a
channel link. This select statement will serve as a Sipping Bird which
will check the network fee rate (as returned by the fee estimator) and
compare that to the fee on the commitment transaction. Using the
shouldAdjustCommitFee function, we determine if we should update the
commitment fee. If so, then we’ll send an UpdateFee message and also
trigger a new commitment update.

We also add a new unit test: TestChannelLinkUpdateCommitFee to ensure
that we update the fee accordingly if the fee increases or decreases by
a large portion.
This commit is contained in:
Olaoluwa Osuntokun 2017-11-23 22:31:45 -06:00
parent c560200ba1
commit 62473009b5
No known key found for this signature in database
GPG Key ID: 964EA263DD637C21
4 changed files with 316 additions and 28 deletions

View File

@ -118,6 +118,11 @@ type ChannelLinkConfig struct {
// in thread-safe manner.
Registry InvoiceDatabase
// FeeEstimator is an instance of a live fee estimator which will be
// used to dynamically regulate the current fee of the commitment
// transaction to ensure timely confirmation.
FeeEstimator lnwallet.FeeEstimator
// BlockEpochs is an active block epoch event stream backed by an
// active ChainNotifier instance. The ChannelLink will use new block
// notifications sent over this channel to decide when a _new_ HTLC is
@ -138,6 +143,7 @@ type ChannelLinkConfig struct {
// HodlHTLC should be active if you want this node to refrain from
// settling all incoming HTLCs with the sender if it finds itself to be
// the exit node.
//
// NOTE: HodlHTLC should be active in conjunction with DebugHTLC.
HodlHTLC bool
@ -286,6 +292,27 @@ func (l *channelLink) Stop() {
l.cfg.BlockEpochs.Cancel()
}
// sampleNetworkFee samples the current fee rate on the network to get into the
// chain in a timely manner. The returned value is expressed in fee-per-kw, as
// this is the native rate used when computing the fee for commitment
// transactions, and the second-level HTLC transactions.
func (l *channelLink) sampleNetworkFee() (btcutil.Amount, error) {
// We'll first query for the sat/weight recommended to be confirmed
// within 3blocks.
feePerWeight, err := l.cfg.FeeEstimator.EstimateFeePerWeight(3)
if err != nil {
return 0, err
}
// Once we have this fee rate, we'll convert to sat-per-kw.
feePerKw := feePerWeight * 1000
log.Debugf("ChannelLink(%v): sampled fee rate for 3 block conf: %v "+
"sat/kw", l, int64(feePerKw))
return feePerKw, nil
}
// shouldAdjustCommitFee returns true if we should update our commitment fee to
// match that of the network fee. We'll only update our commitment fee if the
// network fee is +/- 10% to our network fee.
@ -417,22 +444,45 @@ func (l *channelLink) htlcManager() {
out:
for {
select {
// A new block has arrived, we'll examine all the active HTLC's
// to see if any of them have expired, and also update our
// A new block has arrived, we'll check the network fee to see
// if we should adjust our commitment fee , and also update our
// track of the best current height.
case blockEpoch, ok := <-l.cfg.BlockEpochs.Epochs:
if !ok {
break out
}
log.Tracef("ChannelPoint(%v): new block(height=%v, "+
"hash=%v) examining active HTLC's",
l.channel.ChannelPoint(), blockEpoch.Height,
blockEpoch.Hash)
// TODO(roasbeef): check HTLC's for expiry
l.bestHeight = uint32(blockEpoch.Height)
// If we're not the initiator of the channel, don't we
// don't control the fees, so we can ignore this.
if !l.channel.IsInitiator() {
continue
}
// If we are the initiator, then we'll sample the
// current fee rate to get into the chain within 3
// blocks.
feePerKw, err := l.sampleNetworkFee()
if err != nil {
log.Errorf("unable to sample network fee: %v", err)
continue
}
// We'll check to see if we should update the fee rate
// based on our current set fee rate.
commitFee := l.channel.CommitFeeRate()
if !shouldAdjustCommitFee(feePerKw, commitFee) {
continue
}
// If we do, then we'll send a new UpdateFee message to
// the remote party, to be locked in with a new update.
if err := l.updateChannelFee(feePerKw); err != nil {
log.Errorf("unable to update fee rate: %v", err)
continue
}
// The underlying channel has notified us of a unilateral close
// carried out by the remote peer. In the case of such an
// event, we'll wipe the channel state from the peer, and mark
@ -733,7 +783,7 @@ func (l *channelLink) handleUpstreamMsg(msg lnwire.Message) {
// need to re-transmit any messages to the remote party.
msgsToReSend, err := l.channel.ProcessChanSyncMsg(msg)
if err != nil {
// TODO(roasbeef): check conrete type of error, act
// TODO(roasbeef): check concrete type of error, act
// accordingly
l.fail("unable to handle upstream reestablish "+
"message: %v", err)
@ -1071,14 +1121,22 @@ func (l *channelLink) HandleChannelUpdate(message lnwire.Message) {
// updateChannelFee updates the commitment fee-per-kw on this channel by
// committing to an update_fee message.
func (l *channelLink) updateChannelFee(feePerKw btcutil.Amount) error {
// Update local fee.
log.Infof("ChannelPoint(%v): updating commit fee to %v sat/kw", l,
feePerKw)
// First, we'll update the local fee on our commitment.
if err := l.channel.UpdateFee(feePerKw); err != nil {
return err
}
// Send fee update to remote.
// We'll then attempt to send a new UpdateFee message, and also lock it
// in immediately by triggering a commitment update.
msg := lnwire.NewUpdateFee(l.ChanID(), feePerKw)
return l.cfg.Peer.SendMessage(msg)
if err := l.cfg.Peer.SendMessage(msg); err != nil {
return err
}
return l.updateCommitTx()
}
// processLockedInHtlcs serially processes each of the log updates which have

View File

@ -2119,3 +2119,148 @@ func TestShouldAdjustCommitFee(t *testing.T) {
}
}
}
// TestChannelLinkUpdateCommitFee tests that when a new block comes in, the
// channel link properly checks to see if it should update the commitment fee.
func TestChannelLinkUpdateCommitFee(t *testing.T) {
t.Parallel()
// First, we'll create our traditional three hop network. We'll only be
// interacting with and asserting the state of two of the end points
// for this test.
channels, cleanUp, _, err := createClusterChannels(
btcutil.SatoshiPerBitcoin*3,
btcutil.SatoshiPerBitcoin*5)
if err != nil {
t.Fatalf("unable to create channel: %v", err)
}
defer cleanUp()
n := newThreeHopNetwork(t, channels.aliceToBob, channels.bobToAlice,
channels.bobToCarol, channels.carolToBob, testStartingHeight)
// First, we'll set up some message interceptors to ensure that the
// proper messages are sent when updating fees.
chanID := n.aliceChannelLink.ChanID()
messages := []expectedMessage{
{"alice", "bob", &lnwire.ChannelReestablish{}, false},
{"bob", "alice", &lnwire.ChannelReestablish{}, false},
{"alice", "bob", &lnwire.FundingLocked{}, false},
{"bob", "alice", &lnwire.FundingLocked{}, false},
{"alice", "bob", &lnwire.UpdateFee{}, false},
{"alice", "bob", &lnwire.CommitSig{}, false},
{"bob", "alice", &lnwire.RevokeAndAck{}, false},
{"bob", "alice", &lnwire.CommitSig{}, false},
{"alice", "bob", &lnwire.RevokeAndAck{}, false},
}
n.aliceServer.intersect(createInterceptorFunc("[alice] <-- [bob]",
"alice", messages, chanID, false))
n.bobServer.intersect(createInterceptorFunc("[alice] --> [bob]",
"bob", messages, chanID, false))
if err := n.start(); err != nil {
t.Fatal(err)
}
defer n.stop()
defer n.feeEstimator.Stop()
// First, we'll start off all channels at "height" 9000 by sending a
// new epoch to all the clients.
select {
case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9000,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
select {
case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9000,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
startingFeeRate := channels.aliceToBob.CommitFeeRate()
// Next, we'll send the first fee rate response to Alice.
select {
case n.feeEstimator.weightFeeIn <- startingFeeRate / 1000:
case <-time.After(time.Second * 5):
t.Fatalf("alice didn't query for the new " +
"network fee")
}
time.Sleep(time.Millisecond * 500)
// The fee rate on the alice <-> bob channel should still be the same
// on both sides.
aliceFeeRate := channels.aliceToBob.CommitFeeRate()
bobFeeRate := channels.bobToAlice.CommitFeeRate()
if aliceFeeRate != bobFeeRate {
t.Fatalf("fee rates don't match: expected %v got %v",
aliceFeeRate, bobFeeRate)
}
if aliceFeeRate != startingFeeRate {
t.Fatalf("alice's fee rate shouldn't have changed: "+
"expected %v, got %v", aliceFeeRate, startingFeeRate)
}
if bobFeeRate != startingFeeRate {
t.Fatalf("bob's fee rate shouldn't have changed: "+
"expected %v, got %v", bobFeeRate, startingFeeRate)
}
// Now we'll send a new block update to all end points, with a new
// height THAT'S OVER 9000!!!
select {
case n.aliceBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9001,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
select {
case n.bobFirstBlockEpoch <- &chainntnfs.BlockEpoch{
Height: 9001,
}:
case <-time.After(time.Second * 5):
t.Fatalf("link didn't read block epoch")
}
// Next, we'll set up a deliver a fee rate that's triple the current
// fee rate. This should cause the Alice (the initiator) to trigger a
// fee update.
newFeeRate := startingFeeRate * 3
select {
case n.feeEstimator.weightFeeIn <- newFeeRate:
case <-time.After(time.Second * 5):
t.Fatalf("alice didn't query for the new " +
"network fee")
}
time.Sleep(time.Second * 1)
// At this point, Alice should've triggered a new fee update that
// increased the fee rate to match the new rate.
//
// We'll scale the new fee rate by 100 as we deal with units of fee
// per-kw.
expectedFeeRate := newFeeRate * 1000
aliceFeeRate = channels.aliceToBob.CommitFeeRate()
bobFeeRate = channels.bobToAlice.CommitFeeRate()
if aliceFeeRate != expectedFeeRate {
t.Fatalf("alice's fee rate didn't change: expected %v, got %v",
expectedFeeRate, aliceFeeRate)
}
if bobFeeRate != expectedFeeRate {
t.Fatalf("bob's fee rate didn't change: expected %v, got %v",
expectedFeeRate, aliceFeeRate)
}
if aliceFeeRate != bobFeeRate {
t.Fatalf("fee rates don't match: expected %v got %v",
aliceFeeRate, bobFeeRate)
}
}

View File

@ -22,8 +22,44 @@ import (
"github.com/roasbeef/btcd/chaincfg/chainhash"
"github.com/roasbeef/btcd/txscript"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
type mockFeeEstimator struct {
byteFeeIn chan btcutil.Amount
weightFeeIn chan btcutil.Amount
quit chan struct{}
}
func (m *mockFeeEstimator) EstimateFeePerByte(numBlocks uint32) (btcutil.Amount, error) {
select {
case feeRate := <-m.byteFeeIn:
return feeRate, nil
case <-m.quit:
return 0, fmt.Errorf("exiting")
}
}
func (m *mockFeeEstimator) EstimateFeePerWeight(numBlocks uint32) (btcutil.Amount, error) {
select {
case feeRate := <-m.weightFeeIn:
return feeRate, nil
case <-m.quit:
return 0, fmt.Errorf("exiting")
}
}
func (m *mockFeeEstimator) Start() error {
return nil
}
func (m *mockFeeEstimator) Stop() error {
close(m.quit)
return nil
}
var _ lnwallet.FeeEstimator = (*mockFeeEstimator)(nil)
type mockServer struct {
started int32
shutdown int32
@ -304,6 +340,8 @@ func (s *mockServer) readHandler(message lnwire.Message) error {
return nil
case *lnwire.ChannelReestablish:
targetChan = msg.ChanID
case *lnwire.UpdateFee:
targetChan = msg.ChanID
default:
return fmt.Errorf("unknown message type: %T", msg)
}

View File

@ -365,6 +365,8 @@ func getChanID(msg lnwire.Message) (lnwire.ChannelID, error) {
chanID = msg.ChanID
case *lnwire.FundingLocked:
chanID = msg.ChanID
case *lnwire.UpdateFee:
chanID = msg.ChanID
default:
return chanID, fmt.Errorf("unknown type: %T", msg)
}
@ -426,13 +428,20 @@ func generateRoute(hops ...ForwardingInfo) ([lnwire.OnionPacketSize]byte, error)
type threeHopNetwork struct {
aliceServer *mockServer
aliceChannelLink *channelLink
aliceBlockEpoch chan *chainntnfs.BlockEpoch
firstBobChannelLink *channelLink
bobFirstBlockEpoch chan *chainntnfs.BlockEpoch
firstBobChannelLink *channelLink
bobServer *mockServer
secondBobChannelLink *channelLink
bobSecondBlockEpoch chan *chainntnfs.BlockEpoch
carolChannelLink *channelLink
carolServer *mockServer
carolBlockEpoch chan *chainntnfs.BlockEpoch
feeEstimator *mockFeeEstimator
globalPolicy ForwardingPolicy
}
@ -698,21 +707,29 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
bobServer := newMockServer(t, "bob")
carolServer := newMockServer(t, "carol")
// Create mock decoder instead of sphinx one in order to mock the
// route which htlc should follow.
// Create mock decoder instead of sphinx one in order to mock the route
// which htlc should follow.
decoder := &mockIteratorDecoder{}
globalEpoch := &chainntnfs.BlockEpochEvent{
Epochs: make(chan *chainntnfs.BlockEpoch),
Cancel: func() {
},
feeEstimator := &mockFeeEstimator{
byteFeeIn: make(chan btcutil.Amount),
weightFeeIn: make(chan btcutil.Amount),
quit: make(chan struct{}),
}
globalPolicy := ForwardingPolicy{
MinHTLC: lnwire.NewMSatFromSatoshis(5),
BaseFee: lnwire.NewMSatFromSatoshis(1),
TimeLockDelta: 6,
}
obfuscator := newMockObfuscator()
aliceEpochChan := make(chan *chainntnfs.BlockEpoch)
aliceEpoch := &chainntnfs.BlockEpochEvent{
Epochs: aliceEpochChan,
Cancel: func() {
},
}
aliceChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -725,7 +742,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
Registry: aliceServer.registry,
BlockEpochs: globalEpoch,
BlockEpochs: aliceEpoch,
FeeEstimator: feeEstimator,
SyncStates: true,
},
aliceChannel,
@ -735,6 +753,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
t.Fatalf("unable to add alice channel link: %v", err)
}
bobFirstEpochChan := make(chan *chainntnfs.BlockEpoch)
bobFirstEpoch := &chainntnfs.BlockEpochEvent{
Epochs: bobFirstEpochChan,
Cancel: func() {
},
}
firstBobChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -747,7 +771,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
Registry: bobServer.registry,
BlockEpochs: globalEpoch,
BlockEpochs: bobFirstEpoch,
FeeEstimator: feeEstimator,
SyncStates: true,
},
firstBobChannel,
@ -757,6 +782,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
t.Fatalf("unable to add first bob channel link: %v", err)
}
bobSecondEpochChan := make(chan *chainntnfs.BlockEpoch)
bobSecondEpoch := &chainntnfs.BlockEpochEvent{
Epochs: bobSecondEpochChan,
Cancel: func() {
},
}
secondBobChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -769,7 +800,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
Registry: bobServer.registry,
BlockEpochs: globalEpoch,
BlockEpochs: bobSecondEpoch,
FeeEstimator: feeEstimator,
SyncStates: true,
},
secondBobChannel,
@ -779,6 +811,12 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
t.Fatalf("unable to add second bob channel link: %v", err)
}
carolBlockEpoch := make(chan *chainntnfs.BlockEpoch)
carolEpoch := &chainntnfs.BlockEpochEvent{
Epochs: bobSecondEpochChan,
Cancel: func() {
},
}
carolChannelLink := NewChannelLink(
ChannelLinkConfig{
FwrdingPolicy: globalPolicy,
@ -791,7 +829,8 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
},
GetLastChannelUpdate: mockGetChanUpdateMessage,
Registry: carolServer.registry,
BlockEpochs: globalEpoch,
BlockEpochs: carolEpoch,
FeeEstimator: feeEstimator,
SyncStates: true,
},
carolChannel,
@ -802,14 +841,22 @@ func newThreeHopNetwork(t testing.TB, aliceChannel, firstBobChannel,
}
return &threeHopNetwork{
aliceServer: aliceServer,
aliceChannelLink: aliceChannelLink.(*channelLink),
firstBobChannelLink: firstBobChannelLink.(*channelLink),
aliceServer: aliceServer,
aliceChannelLink: aliceChannelLink.(*channelLink),
aliceBlockEpoch: aliceEpochChan,
firstBobChannelLink: firstBobChannelLink.(*channelLink),
bobFirstBlockEpoch: bobFirstEpochChan,
bobServer: bobServer,
secondBobChannelLink: secondBobChannelLink.(*channelLink),
carolChannelLink: carolChannelLink.(*channelLink),
carolServer: carolServer,
bobSecondBlockEpoch: bobSecondEpochChan,
carolChannelLink: carolChannelLink.(*channelLink),
carolServer: carolServer,
carolBlockEpoch: carolBlockEpoch,
feeEstimator: feeEstimator,
globalPolicy: globalPolicy,
}
}