channeldb: split RecordChannelDelta into two distinct methods

This commit splits the previously added RecordChannelDelta method into
two distinct methods: UpdateCommitment and AppendToRevocationLog. The
former method is to be used once the local party revokes their current
commitment, and the latter method to be used once the remote party
revokes their current commitment.

With the addition of the UpdateCommitment method, the active HTLC’s
from the local node’s point of view are now persisted to disk.
Snapshots returned by the channel now also includes the current set of
active HTLC’s. In order to maintain thread safety the channels mutex is
now grabbed within methods which modify/read state but don’t do so
solely via a boltDB transaction.

The tests have been updated to account for the storage of HTLC’s needed
in order to assert proper behavior.
This commit is contained in:
Olaoluwa Osuntokun 2016-09-06 19:17:34 -07:00
parent 2235785ed8
commit e60778a867
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
2 changed files with 249 additions and 102 deletions

View File

@ -75,6 +75,10 @@ var (
// commitment transactions in addition to the csvDelay for both.
commitTxnsKey = []byte("ctk")
// currentHtlcKey stores the set of fully locked-in HTLC's on our
// latest commitment state.
currentHtlcKey = []byte("chk")
// fundingTxnKey stroes the funding tx, our encrypted multi-sig key,
// and finally 2-of-2 multisig redeem script.
fundingTxnKey = []byte("fsk")
@ -100,12 +104,8 @@ type OpenChannel struct {
TheirLNID [wire.HashSize]byte
// The ID of a channel is the txid of the funding transaction.
ChanID *wire.OutPoint
ChanID *wire.OutPoint
MinFeePerKb btcutil.Amount
// Our reserve. Assume symmetric reserve amounts. Only needed if the
// funding type is CLTV.
//ReserveAmount btcutil.Amount
// Keys for both sides to be used for the commitment transactions.
OurCommitKey *btcec.PublicKey
@ -152,6 +152,8 @@ type OpenChannel struct {
TotalNetFees uint64 // TODO(roasbeef): total fees paid too?
CreationTime time.Time // TODO(roasbeef): last update time?
Htlcs []*HTLC
// TODO(roasbeef): eww
Db *DB
@ -165,6 +167,9 @@ type OpenChannel struct {
// NOTE: This method requires an active EncryptorDecryptor to be registered in
// order to encrypt sensitive information.
func (c *OpenChannel) FullSync() error {
c.Lock()
defer c.Unlock()
return c.Db.store.Update(func(tx *bolt.Tx) error {
// TODO(roasbeef): add helper funcs to create scoped update
// First fetch the top level bucket which stores all data related to
@ -192,34 +197,61 @@ func (c *OpenChannel) FullSync() error {
return err
}
if chanIDBucket.Get(b.Bytes()) == nil {
chanIDBucket.Put(b.Bytes(), nil)
if err := chanIDBucket.Put(b.Bytes(), nil); err != nil {
return err
}
}
return putOpenChannel(chanBucket, nodeChanBucket, c)
})
}
// SyncRevocation writes to disk the current revocation state of the channel.
// The revocation state is defined as the current elkrem receiver, and the
// latest unrevoked key+hash for the remote party.
func (c *OpenChannel) SyncRevocation() error {
// UpdateCommitment updates the on-disk state of our currently broadcastable
// commitment state. This method is to be called once we have revoked our prior
// commitment state, accepting the new state as defined by the passed
// parameters.
func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx,
newSig []byte, delta *ChannelDelta) error {
c.Lock()
defer c.Unlock()
return c.Db.store.Update(func(tx *bolt.Tx) error {
// First fetch the top level bucket which stores all data related to
// current, active channels.
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
}
// Within this top level bucket, fetch the bucket dedicated to storing
// open channel data specific to the remote node.
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(c.TheirLNID[:])
id := c.TheirLNID[:]
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id)
if err != nil {
return err
}
// Sync the current elkrem state to disk.
if err := putChanEklremState(nodeChanBucket, c); err != nil {
// TODO(roasbeef): modify the funcs below to take values
// directly, otherwise need to roll back to prior state. Could
// also make copy above, then modify to pass in.
c.OurCommitTx = newCommitment
c.OurCommitSig = newSig
c.OurBalance = delta.LocalBalance
c.TheirBalance = delta.RemoteBalance
c.NumUpdates = uint64(delta.UpdateNum)
c.Htlcs = delta.Htlcs
// First we'll write out the current latest dynamic channel
// state: the current channel balance, the number of updates,
// and our latest commitment transaction+sig.
// TODO(roasbeef): re-make schema s.t this is a single put
if err := putChanCapacity(chanBucket, c); err != nil {
return err
}
if err := putChanNumUpdates(chanBucket, c); err != nil {
return err
}
if err := putChanCommitTxns(nodeChanBucket, c); err != nil {
return err
}
if err := putCurrentHtlcs(nodeChanBucket, delta.Htlcs, c.ChanID); err != nil {
return err
}
@ -245,11 +277,24 @@ type HTLC struct {
// must wait before reclaiming the funds in limbo.
RefundTimeout uint32
// RevocationTimeout is the relative timeout the party who broadcasts
// RevocationDelay is the relative timeout the party who broadcasts
// the commitment transaction must wait before being able to fully
// sweep the funds on-chain in the case of a unilateral channel
// closure.
RevocationTimeout uint32
RevocationDelay uint32
}
// Copy returns a full copy of the target HTLC.
func (h *HTLC) Copy() HTLC {
clone := HTLC{
Incoming: h.Incoming,
Amt: h.Amt,
RefundTimeout: h.RefundTimeout,
RevocationDelay: h.RevocationDelay,
}
copy(clone.RHash[:], h.RHash[:])
return clone
}
// ChannelDelta is a snapshot of the commitment state at a particular point in
@ -263,13 +308,12 @@ type ChannelDelta struct {
Htlcs []*HTLC
}
// RecordChannelDelta records the new state transition within an on-disk
// append-only log which records all state transitions. Additionally, the
// internal balances and update counter of the target OpenChannel are updated
// accordingly based on the passed delta.
func (c *OpenChannel) RecordChannelDelta(newCommitment *wire.MsgTx,
newSig []byte, delta *ChannelDelta) error {
// AppendToRevocationLog records the new state transition within an on-disk
// append-only log which records all state transitions by the remote peer. In
// the case of an uncooperative broadcast of a prior state by the remote peer,
// this log can be consulted in order to reconstruct the state needed to
// rectify the situation.
func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error {
return c.Db.store.Update(func(tx *bolt.Tx) error {
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
@ -278,32 +322,19 @@ func (c *OpenChannel) RecordChannelDelta(newCommitment *wire.MsgTx,
id := c.TheirLNID[:]
nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id)
if nodeChanBucket == nil {
return ErrNoActiveChannels
}
// TODO(roasbeef): revisit in-line mutation
c.OurCommitTx = newCommitment
c.OurBalance = delta.LocalBalance
c.TheirBalance = delta.RemoteBalance
c.OurCommitSig = newSig
c.NumUpdates = uint64(delta.UpdateNum)
// First we'll write out the current latest dynamic channel
// state: the current channel balance, the number of updates,
// and our latest commitment transaction+sig.
if err := putChanCapacity(chanBucket, c); err != nil {
return err
}
if err := putChanNumUpdates(chanBucket, c); err != nil {
return err
}
if err := putChanCommitTxns(nodeChanBucket, c); err != nil {
if err != nil {
return err
}
// With the current state updated, append a new log entry
// recording this the delta of this state transition.
// Persist the latest elkrem state to disk as the remote peer
// has just added to our local elkrem receiver, and given us a
// new pending revocation key.
if err := putChanElkremState(nodeChanBucket, c); err != nil {
return err
}
// With the current elkrem state updated, append a new log
// entry recording this the delta of this state transition.
// TODO(roasbeef): could make the deltas relative, would save
// space, but then tradeoff for more disk-seeks to recover the
// full state.
@ -355,6 +386,7 @@ func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelDelta, error)
// entails deleting all saved state within the database concerning this
// channel, as well as created a small channel summary for record keeping
// purposes.
// TODO(roasbeef): delete on-disk set of HTLC's
func (c *OpenChannel) CloseChannel() error {
return c.Db.store.Update(func(tx *bolt.Tx) error {
// First fetch the top level bucket which stores all data related to
@ -415,15 +447,16 @@ type ChannelSnapshot struct {
TotalSatoshisSent uint64
TotalSatoshisReceived uint64
// TODO(roasbeef): fee stuff
updateNum uint64
channel *OpenChannel
Htlcs []HTLC
}
// Snapshot returns a read-only snapshot of the current channel state. This
// snapshot includes information concerning the current settled balance within
// the channel, meta-data detailing total flows, and any outstanding HTLCs.
func (c *OpenChannel) Snapshot() *ChannelSnapshot {
c.RLock()
defer c.RUnlock()
snapshot := &ChannelSnapshot{
ChannelPoint: c.ChanID,
Capacity: c.Capacity,
@ -435,8 +468,12 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot {
}
copy(snapshot.RemoteID[:], c.TheirLNID[:])
// TODO(roasbeef): cache current channel delta in memory, either merge
// or replace with ChannelSnapshot
// Copy over the current set of HTLC's to ensure the caller can't
// mutate our internal state.
snapshot.Htlcs = make([]HTLC, len(c.Htlcs))
for i, h := range c.Htlcs {
snapshot.Htlcs[i] = h.Copy()
}
return snapshot
}
@ -491,12 +528,16 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err := putChanFundingInfo(nodeChanBucket, channel); err != nil {
return err
}
if err := putChanEklremState(nodeChanBucket, channel); err != nil {
if err := putChanElkremState(nodeChanBucket, channel); err != nil {
return err
}
if err := putChanDeliveryScripts(nodeChanBucket, channel); err != nil {
return err
}
if err := putCurrentHtlcs(nodeChanBucket, channel.Htlcs,
channel.ChanID); err != nil {
return err
}
return nil
}
@ -508,46 +549,51 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
chanID *wire.OutPoint) (*OpenChannel, error) {
var err error
channel := &OpenChannel{
ChanID: chanID,
}
// First, read out the fields of the channel update less frequently.
if err := fetchChannelIDs(nodeChanBucket, channel); err != nil {
if err = fetchChannelIDs(nodeChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanCommitKeys(nodeChanBucket, channel); err != nil {
if err = fetchChanCommitKeys(nodeChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanCommitTxns(nodeChanBucket, channel); err != nil {
if err = fetchChanCommitTxns(nodeChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanFundingInfo(nodeChanBucket, channel); err != nil {
if err = fetchChanFundingInfo(nodeChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanEklremState(nodeChanBucket, channel); err != nil {
if err = fetchChanElkremState(nodeChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanDeliveryScripts(nodeChanBucket, channel); err != nil {
if err = fetchChanDeliveryScripts(nodeChanBucket, channel); err != nil {
return nil, err
}
channel.Htlcs, err = fetchCurrentHtlcs(nodeChanBucket, chanID)
if err != nil {
return nil, err
}
// With the existence of an open channel bucket with this node verified,
// perform a full read of the entire struct. Starting with the prefixed
// fields residing in the parent bucket.
if err := fetchChanCapacity(openChanBucket, channel); err != nil {
if err = fetchChanCapacity(openChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanMinFeePerKb(openChanBucket, channel); err != nil {
if err = fetchChanMinFeePerKb(openChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanNumUpdates(openChanBucket, channel); err != nil {
if err = fetchChanNumUpdates(openChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanTotalFlow(openChanBucket, channel); err != nil {
if err = fetchChanTotalFlow(openChanBucket, channel); err != nil {
return nil, err
}
if err := fetchChanNetFee(openChanBucket, channel); err != nil {
if err = fetchChanNetFee(openChanBucket, channel); err != nil {
return nil, err
}
@ -589,7 +635,7 @@ func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
if err := deleteChanFundingInfo(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanEklremState(nodeChanBucket, channelID); err != nil {
if err := deleteChanElkremState(nodeChanBucket, channelID); err != nil {
return err
}
if err := deleteChanDeliveryScripts(nodeChanBucket, channelID); err != nil {
@ -1118,7 +1164,7 @@ func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) err
return nil
}
func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
func putChanElkremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var bc bytes.Buffer
if err := writeOutpoint(&bc, channel.ChanID); err != nil {
return err
@ -1157,14 +1203,14 @@ func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error
return nodeChanBucket.Put(elkremKey, b.Bytes())
}
func deleteChanEklremState(nodeChanBucket *bolt.Bucket, chanID []byte) error {
func deleteChanElkremState(nodeChanBucket *bolt.Bucket, chanID []byte) error {
elkremKey := make([]byte, len(elkremStateKey)+len(chanID))
copy(elkremKey[:3], elkremStateKey)
copy(elkremKey[3:], chanID)
return nodeChanBucket.Delete(elkremKey)
}
func fetchChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
func fetchChanElkremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error {
var b bytes.Buffer
if err := writeOutpoint(&b, channel.ChanID); err != nil {
return err
@ -1286,7 +1332,7 @@ func serializeHTLC(w io.Writer, h *HTLC) error {
n += copy(buf[n:], h.RHash[:])
byteOrder.PutUint32(buf[n:], h.RefundTimeout)
n += 4
byteOrder.PutUint32(buf[n:], h.RevocationTimeout)
byteOrder.PutUint32(buf[n:], h.RevocationDelay)
n += 4
if _, err := w.Write(buf[:]); err != nil {
@ -1327,11 +1373,65 @@ func deserializeHTLC(r io.Reader) (*HTLC, error) {
if _, err := r.Read(scratch[:4]); err != nil {
return nil, err
}
h.RevocationTimeout = byteOrder.Uint32(scratch[:])
h.RevocationDelay = byteOrder.Uint32(scratch[:])
return h, nil
}
func makeHtlcKey(o *wire.OutPoint) [39]byte {
var (
n int
k [39]byte
)
// chk || txid || index
n += copy(k[:], currentHtlcKey)
n += copy(k[n:], o.Hash[:])
var scratch [4]byte
byteOrder.PutUint32(scratch[:], o.Index)
copy(k[n:], scratch[:])
return k
}
func putCurrentHtlcs(nodeChanBucket *bolt.Bucket, htlcs []*HTLC,
o *wire.OutPoint) error {
var b bytes.Buffer
for _, htlc := range htlcs {
if err := serializeHTLC(&b, htlc); err != nil {
return err
}
}
htlcKey := makeHtlcKey(o)
return nodeChanBucket.Put(htlcKey[:], b.Bytes())
}
func fetchCurrentHtlcs(nodeChanBucket *bolt.Bucket,
o *wire.OutPoint) ([]*HTLC, error) {
htlcKey := makeHtlcKey(o)
htlcBytes := nodeChanBucket.Get(htlcKey[:])
if htlcBytes == nil {
return nil, nil
}
// TODO(roasbeef): can preallocate here
var htlcs []*HTLC
htlcReader := bytes.NewReader(htlcBytes)
for htlcReader.Len() != 0 {
htlc, err := deserializeHTLC(htlcReader)
if err != nil {
return nil, err
}
htlcs = append(htlcs, htlc)
}
return htlcs, nil
}
func serializeChannelDelta(w io.Writer, delta *ChannelDelta) error {
// TODO(roasbeef): could use compression here to reduce on-disk space.
var scratch [8]byte
@ -1401,39 +1501,41 @@ func deserializeChannelDelta(r io.Reader) (*ChannelDelta, error) {
return delta, nil
}
func makeLogKey(o *wire.OutPoint, updateNum uint32) [40]byte {
var (
scratch [4]byte
n int
k [40]byte
)
n += copy(k[:], o.Hash[:])
byteOrder.PutUint32(scratch[:], o.Index)
copy(k[n:], scratch[:])
n += 4
byteOrder.PutUint32(scratch[:], updateNum)
copy(k[n:], scratch[:])
return k
}
func appendChannelLogEntry(log *bolt.Bucket, delta *ChannelDelta,
chanPoint *wire.OutPoint) error {
// First construct the key for this particular log entry. The key for
// each newly added log entry is: channelPoint || stateNum.
var logEntrykey [36 + 4]byte
copy(logEntrykey[:], chanPoint.Hash[:])
var scratch [4]byte
byteOrder.PutUint32(scratch[:], delta.UpdateNum)
copy(logEntrykey[36:], scratch[:])
// With the key constructed, serialize the delta to raw bytes, then
// write the new state to disk.
var b bytes.Buffer
if err := serializeChannelDelta(&b, delta); err != nil {
return err
}
logEntrykey := makeLogKey(chanPoint, delta.UpdateNum)
return log.Put(logEntrykey[:], b.Bytes())
}
func fetchChannelLogEntry(log *bolt.Bucket, chanPoint *wire.OutPoint,
updateNum uint32) (*ChannelDelta, error) {
// First construct the key for this particular log entry. The key for
// each newly added log entry is: channelPoint || stateNum.
// TODO(roasbeef): make into func..
var logEntrykey [36 + 4]byte
copy(logEntrykey[:], chanPoint.Hash[:])
var scratch [4]byte
byteOrder.PutUint32(scratch[:], updateNum)
copy(logEntrykey[36:], scratch[:])
logEntrykey := makeLogKey(chanPoint, updateNum)
deltaBytes := log.Get(logEntrykey[:])
if deltaBytes == nil {
return nil, fmt.Errorf("log entry not found")

View File

@ -171,10 +171,21 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
}
defer cleanUp()
// Create the test channel state, then add an additional fake HTLC
// before syncing to disk.
state, err := createTestChannelState(cdb)
if err != nil {
t.Fatalf("unable to create channel state: %v", err)
}
state.Htlcs = []*HTLC{
&HTLC{
Incoming: true,
Amt: 10,
RHash: key,
RefundTimeout: 1,
RevocationDelay: 2,
},
}
if err := state.FullSync(); err != nil {
t.Fatalf("unable to save and serialize channel state: %v", err)
}
@ -302,6 +313,10 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
if !bytes.Equal(newState.TheirCurrentRevocationHash[:], state.TheirCurrentRevocationHash[:]) {
t.Fatalf("revocation hashes don't match")
}
if !reflect.DeepEqual(state.Htlcs[0], newState.Htlcs[0]) {
t.Fatalf("htlcs don't match: %v vs %v", spew.Sdump(state.Htlcs[0]),
spew.Sdump(newState.Htlcs[0]))
}
// Finally to wrap up the test, delete the state of the channel within
// the database. This involves "closing" the channel which removes all
@ -324,7 +339,7 @@ func TestOpenChannelPutGetDelete(t *testing.T) {
}
}
func TestChannelStateUpdateLog(t *testing.T) {
func TestChannelStateTransition(t *testing.T) {
cdb, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("uanble to make test database: %v", err)
@ -350,11 +365,11 @@ func TestChannelStateUpdateLog(t *testing.T) {
incoming = true
}
htlc := &HTLC{
Incoming: incoming,
Amt: 50000,
RHash: key,
RefundTimeout: i,
RevocationTimeout: i + 2,
Incoming: incoming,
Amt: 50000,
RHash: key,
RefundTimeout: i,
RevocationDelay: i + 2,
}
htlcs = append(htlcs, htlc)
}
@ -372,13 +387,15 @@ func TestChannelStateUpdateLog(t *testing.T) {
Htlcs: htlcs,
UpdateNum: 1,
}
if err := channel.RecordChannelDelta(newTx, newSig, delta); err != nil {
t.Fatalf("unable to record channel delta: %v", err)
// First update the local node's broadcastable state.
if err := channel.UpdateCommitment(newTx, newSig, delta); err != nil {
t.Fatalf("unable to update commitment: %v", err)
}
// The balances, new update, and the changes to the fake commitment
// transaction along with the modified signature should all have been
// updated.
// The balances, new update, the HTLC's and the changes to the fake
// commitment transaction along with the modified signature should all
// have been updated.
nodeID := wire.ShaHash(channel.TheirLNID)
updatedChannel, err := cdb.FetchOpenChannels(&nodeID)
if err != nil {
@ -404,6 +421,25 @@ func TestChannelStateUpdateLog(t *testing.T) {
t.Fatalf("update # doesn't match: %v vs %v",
updatedChannel[0].NumUpdates, delta.UpdateNum)
}
for i := 0; i < len(updatedChannel[0].Htlcs); i++ {
originalHTLC := updatedChannel[0].Htlcs[i]
diskHTLC := channel.Htlcs[i]
if !reflect.DeepEqual(originalHTLC, diskHTLC) {
t.Fatalf("htlc's dont match: %v vs %v",
spew.Sdump(originalHTLC),
spew.Sdump(diskHTLC))
}
}
// Next, write to the log which tracks the necessary revocation state
// needed to rectify any fishy behavior by the remote party. Modify the
// current uncollapsed revocation state to simulate a state transition
// by the remote party.
newRevocation := bytes.Repeat([]byte{9}, 32)
copy(channel.TheirCurrentRevocationHash[:], newRevocation)
if err := channel.AppendToRevocationLog(delta); err != nil {
t.Fatalf("unable to append to revocation log: %v", err)
}
// We should be able to fetch the channel delta created above by it's
// update number with all the state properly reconstructed.
@ -432,4 +468,13 @@ func TestChannelStateUpdateLog(t *testing.T) {
spew.Sdump(diskHTLC))
}
}
// The revocation state stored on-disk should now also be identical.
updatedChannel, err = cdb.FetchOpenChannels(&nodeID)
if err != nil {
t.Fatalf("unable to fetch updated channel: %v", err)
}
if !bytes.Equal(updatedChannel[0].TheirCurrentRevocationHash[:],
newRevocation) {
t.Fatalf("revocation state wasn't synced!")
}
}