peer: restore the htlcManager's logCommitTimer to a persistent ticker

This commit patches a whole in our optimistic channel synchronization
logic by making the logCommitTimer a persistent ticker rather than one
that is activated after receiving a commitment, and disabled once we
send a new commitment ourself. In the setting of batched full-duplex
channel updates, the prior approach could at times result in a benign
state desync caused by one side being one commitment ahead of the other
because one of the nodes failed to, or was unable to provide the other
with a state update during the workflow.
This commit is contained in:
Olaoluwa Osuntokun 2017-04-11 21:59:45 -07:00
parent 3393f3a8db
commit 178f26b8d5
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 12 additions and 36 deletions

View File

@ -442,7 +442,8 @@ out:
payHash: payHash, payHash: payHash,
msg: &lnwire.UpdateFailHTLC{ msg: &lnwire.UpdateFailHTLC{
Reason: []byte{uint8(lnwire.InsufficientCapacity)}, Reason: []byte{uint8(lnwire.InsufficientCapacity)},
}, err: make(chan error, 1), },
err: make(chan error, 1),
} }
// Send the cancel message along the // Send the cancel message along the

45
peer.go
View File

@ -277,9 +277,8 @@ func (p *peer) loadActiveChannels(chans []*channeldb.OpenChannel) error {
return nil return nil
} }
// Start starts all helper goroutines the peer needs for normal operations. // Start starts all helper goroutines the peer needs for normal operations. In
// In the case this peer has already been started, then this function is a // the case this peer has already been started, then this function is a loop.
// loop.
func (p *peer) Start() error { func (p *peer) Start() error {
if atomic.AddInt32(&p.started, 1) != 1 { if atomic.AddInt32(&p.started, 1) != 1 {
return nil return nil
@ -294,7 +293,7 @@ func (p *peer) Start() error {
} }
// Before we launch any of the helper goroutines off the peer struct, // Before we launch any of the helper goroutines off the peer struct,
// we'll first ensure proper adherance to the p2p protocl. The init // we'll first ensure proper adherence to the p2p protocl. The init
// message MUST be sent before any other message. // message MUST be sent before any other message.
readErr := make(chan error, 1) readErr := make(chan error, 1)
msgChan := make(chan lnwire.Message, 1) msgChan := make(chan lnwire.Message, 1)
@ -309,7 +308,7 @@ func (p *peer) Start() error {
}() }()
select { select {
// In order to avoid blocking indefinately, we'll give the other peer // In order to avoid blocking indefinitely, we'll give the other peer
// an upper timeout of 5 seconds to respond before we bail out early. // an upper timeout of 5 seconds to respond before we bail out early.
case <-time.After(time.Second * 5): case <-time.After(time.Second * 5):
return fmt.Errorf("peer did not complete handshake within 5 " + return fmt.Errorf("peer did not complete handshake within 5 " +
@ -1069,19 +1068,14 @@ type commitmentState struct {
// htlcSwitch, or subsystem that initiated the HTLC. // htlcSwitch, or subsystem that initiated the HTLC.
cancelReasons map[uint64]lnwire.FailCode cancelReasons map[uint64]lnwire.FailCode
// pendingBatch is slice of payments which have been added to the
// channel update log, but not yet committed to latest commitment.
pendingBatch []*pendingPayment pendingBatch []*pendingPayment
// clearedHTCLs is a map of outgoing HTLCs we've committed to in our // clearedHTCLs is a map of outgoing HTLCs we've committed to in our
// chain which have not yet been settled by the upstream peer. // chain which have not yet been settled by the upstream peer.
clearedHTCLs map[uint64]*pendingPayment clearedHTCLs map[uint64]*pendingPayment
// logCommitTimer is a timer which is sent upon if we go an interval
// without receiving/sending a commitment update. It's role is to
// ensure both chains converge to identical state in a timely manner.
// TODO(roasbeef): timer should be >> then RTT
logCommitTimer *time.Timer
logCommitTick <-chan time.Time
// switchChan is a channel used to send packets to the htlc switch for // switchChan is a channel used to send packets to the htlc switch for
// forwarding. // forwarding.
switchChan chan<- *htlcPacket switchChan chan<- *htlcPacket
@ -1143,7 +1137,6 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket), pendingCircuits: make(map[uint64]*sphinx.ProcessedPacket),
sphinx: p.server.sphinx, sphinx: p.server.sphinx,
switchChan: htlcPlex, switchChan: htlcPlex,
logCommitTimer: time.NewTimer(300 * time.Millisecond),
} }
// TODO(roasbeef): check to see if able to settle any currently pending // TODO(roasbeef): check to see if able to settle any currently pending
@ -1153,6 +1146,9 @@ func (p *peer) htlcManager(channel *lnwallet.LightningChannel,
batchTimer := time.NewTicker(50 * time.Millisecond) batchTimer := time.NewTicker(50 * time.Millisecond)
defer batchTimer.Stop() defer batchTimer.Stop()
logCommitTimer := time.NewTicker(300 * time.Millisecond)
defer logCommitTimer.Stop()
out: out:
for { for {
select { select {
@ -1176,7 +1172,7 @@ out:
state.chanPoint, p.id) state.chanPoint, p.id)
break out break out
case <-state.logCommitTick: case <-logCommitTimer.C:
// If we haven't sent or received a new commitment // If we haven't sent or received a new commitment
// update in some time, check to see if we have any // update in some time, check to see if we have any
// pending updates we need to commit due to our // pending updates we need to commit due to our
@ -1508,16 +1504,6 @@ func (p *peer) handleUpstreamMsg(state *commitmentState, msg lnwire.Message) {
} }
p.queueMsg(nextRevocation, nil) p.queueMsg(nextRevocation, nil)
if !state.logCommitTimer.Stop() {
select {
case <-state.logCommitTimer.C:
default:
}
}
state.logCommitTimer.Reset(300 * time.Millisecond)
state.logCommitTick = state.logCommitTimer.C
// If both commitment chains are fully synced from our PoV, // If both commitment chains are fully synced from our PoV,
// then we don't need to reply with a signature as both sides // then we don't need to reply with a signature as both sides
// already have a commitment with the latest accepted state. // already have a commitment with the latest accepted state.
@ -1732,17 +1718,6 @@ func (p *peer) updateCommitTx(state *commitmentState) error {
state.clearedHTCLs[update.index] = update state.clearedHTCLs[update.index] = update
} }
// We've just initiated a state transition, attempt to stop the
// logCommitTimer. If the timer already ticked, then we'll consume the
// value, dropping
if state.logCommitTimer != nil && !state.logCommitTimer.Stop() {
select {
case <-state.logCommitTimer.C:
default:
}
}
state.logCommitTick = nil
// Finally, clear our the current batch, and flip the pendingUpdate // Finally, clear our the current batch, and flip the pendingUpdate
// bool to indicate were waiting for a commitment signature. // bool to indicate were waiting for a commitment signature.
// TODO(roasbeef): re-slice instead to avoid GC? // TODO(roasbeef): re-slice instead to avoid GC?