From e60778a867a1e03992addf02c3fa83eea5f9bbed Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Tue, 6 Sep 2016 19:17:34 -0700 Subject: [PATCH] channeldb: split RecordChannelDelta into two distinct methods MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit This commit splits the previously added RecordChannelDelta method into two distinct methods: UpdateCommitment and AppendToRevocationLog. The former method is to be used once the local party revokes their current commitment, and the latter method to be used once the remote party revokes their current commitment. With the addition of the UpdateCommitment method, the active HTLC’s from the local node’s point of view are now persisted to disk. Snapshots returned by the channel now also includes the current set of active HTLC’s. In order to maintain thread safety the channels mutex is now grabbed within methods which modify/read state but don’t do so solely via a boltDB transaction. The tests have been updated to account for the storage of HTLC’s needed in order to assert proper behavior. --- channeldb/channel.go | 284 ++++++++++++++++++++++++++------------ channeldb/channel_test.go | 67 +++++++-- 2 files changed, 249 insertions(+), 102 deletions(-) diff --git a/channeldb/channel.go b/channeldb/channel.go index b3230550..0d7e6cd6 100644 --- a/channeldb/channel.go +++ b/channeldb/channel.go @@ -75,6 +75,10 @@ var ( // commitment transactions in addition to the csvDelay for both. commitTxnsKey = []byte("ctk") + // currentHtlcKey stores the set of fully locked-in HTLC's on our + // latest commitment state. + currentHtlcKey = []byte("chk") + // fundingTxnKey stroes the funding tx, our encrypted multi-sig key, // and finally 2-of-2 multisig redeem script. fundingTxnKey = []byte("fsk") @@ -100,12 +104,8 @@ type OpenChannel struct { TheirLNID [wire.HashSize]byte // The ID of a channel is the txid of the funding transaction. - ChanID *wire.OutPoint - + ChanID *wire.OutPoint MinFeePerKb btcutil.Amount - // Our reserve. Assume symmetric reserve amounts. Only needed if the - // funding type is CLTV. - //ReserveAmount btcutil.Amount // Keys for both sides to be used for the commitment transactions. OurCommitKey *btcec.PublicKey @@ -152,6 +152,8 @@ type OpenChannel struct { TotalNetFees uint64 // TODO(roasbeef): total fees paid too? CreationTime time.Time // TODO(roasbeef): last update time? + Htlcs []*HTLC + // TODO(roasbeef): eww Db *DB @@ -165,6 +167,9 @@ type OpenChannel struct { // NOTE: This method requires an active EncryptorDecryptor to be registered in // order to encrypt sensitive information. func (c *OpenChannel) FullSync() error { + c.Lock() + defer c.Unlock() + return c.Db.store.Update(func(tx *bolt.Tx) error { // TODO(roasbeef): add helper funcs to create scoped update // First fetch the top level bucket which stores all data related to @@ -192,34 +197,61 @@ func (c *OpenChannel) FullSync() error { return err } if chanIDBucket.Get(b.Bytes()) == nil { - chanIDBucket.Put(b.Bytes(), nil) + if err := chanIDBucket.Put(b.Bytes(), nil); err != nil { + return err + } } return putOpenChannel(chanBucket, nodeChanBucket, c) }) } -// SyncRevocation writes to disk the current revocation state of the channel. -// The revocation state is defined as the current elkrem receiver, and the -// latest unrevoked key+hash for the remote party. -func (c *OpenChannel) SyncRevocation() error { +// UpdateCommitment updates the on-disk state of our currently broadcastable +// commitment state. This method is to be called once we have revoked our prior +// commitment state, accepting the new state as defined by the passed +// parameters. +func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx, + newSig []byte, delta *ChannelDelta) error { + + c.Lock() + defer c.Unlock() + return c.Db.store.Update(func(tx *bolt.Tx) error { - // First fetch the top level bucket which stores all data related to - // current, active channels. chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) if err != nil { return err } - // Within this top level bucket, fetch the bucket dedicated to storing - // open channel data specific to the remote node. - nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(c.TheirLNID[:]) + id := c.TheirLNID[:] + nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) if err != nil { return err } - // Sync the current elkrem state to disk. - if err := putChanEklremState(nodeChanBucket, c); err != nil { + // TODO(roasbeef): modify the funcs below to take values + // directly, otherwise need to roll back to prior state. Could + // also make copy above, then modify to pass in. + c.OurCommitTx = newCommitment + c.OurCommitSig = newSig + c.OurBalance = delta.LocalBalance + c.TheirBalance = delta.RemoteBalance + c.NumUpdates = uint64(delta.UpdateNum) + c.Htlcs = delta.Htlcs + + // First we'll write out the current latest dynamic channel + // state: the current channel balance, the number of updates, + // and our latest commitment transaction+sig. + // TODO(roasbeef): re-make schema s.t this is a single put + if err := putChanCapacity(chanBucket, c); err != nil { + return err + } + if err := putChanNumUpdates(chanBucket, c); err != nil { + return err + } + if err := putChanCommitTxns(nodeChanBucket, c); err != nil { + return err + } + if err := putCurrentHtlcs(nodeChanBucket, delta.Htlcs, c.ChanID); err != nil { return err } @@ -245,11 +277,24 @@ type HTLC struct { // must wait before reclaiming the funds in limbo. RefundTimeout uint32 - // RevocationTimeout is the relative timeout the party who broadcasts + // RevocationDelay is the relative timeout the party who broadcasts // the commitment transaction must wait before being able to fully // sweep the funds on-chain in the case of a unilateral channel // closure. - RevocationTimeout uint32 + RevocationDelay uint32 +} + +// Copy returns a full copy of the target HTLC. +func (h *HTLC) Copy() HTLC { + clone := HTLC{ + Incoming: h.Incoming, + Amt: h.Amt, + RefundTimeout: h.RefundTimeout, + RevocationDelay: h.RevocationDelay, + } + copy(clone.RHash[:], h.RHash[:]) + + return clone } // ChannelDelta is a snapshot of the commitment state at a particular point in @@ -263,13 +308,12 @@ type ChannelDelta struct { Htlcs []*HTLC } -// RecordChannelDelta records the new state transition within an on-disk -// append-only log which records all state transitions. Additionally, the -// internal balances and update counter of the target OpenChannel are updated -// accordingly based on the passed delta. -func (c *OpenChannel) RecordChannelDelta(newCommitment *wire.MsgTx, - newSig []byte, delta *ChannelDelta) error { - +// AppendToRevocationLog records the new state transition within an on-disk +// append-only log which records all state transitions by the remote peer. In +// the case of an uncooperative broadcast of a prior state by the remote peer, +// this log can be consulted in order to reconstruct the state needed to +// rectify the situation. +func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error { return c.Db.store.Update(func(tx *bolt.Tx) error { chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket) if err != nil { @@ -278,32 +322,19 @@ func (c *OpenChannel) RecordChannelDelta(newCommitment *wire.MsgTx, id := c.TheirLNID[:] nodeChanBucket, err := chanBucket.CreateBucketIfNotExists(id) - if nodeChanBucket == nil { - return ErrNoActiveChannels - } - - // TODO(roasbeef): revisit in-line mutation - c.OurCommitTx = newCommitment - c.OurBalance = delta.LocalBalance - c.TheirBalance = delta.RemoteBalance - c.OurCommitSig = newSig - c.NumUpdates = uint64(delta.UpdateNum) - - // First we'll write out the current latest dynamic channel - // state: the current channel balance, the number of updates, - // and our latest commitment transaction+sig. - if err := putChanCapacity(chanBucket, c); err != nil { - return err - } - if err := putChanNumUpdates(chanBucket, c); err != nil { - return err - } - if err := putChanCommitTxns(nodeChanBucket, c); err != nil { + if err != nil { return err } - // With the current state updated, append a new log entry - // recording this the delta of this state transition. + // Persist the latest elkrem state to disk as the remote peer + // has just added to our local elkrem receiver, and given us a + // new pending revocation key. + if err := putChanElkremState(nodeChanBucket, c); err != nil { + return err + } + + // With the current elkrem state updated, append a new log + // entry recording this the delta of this state transition. // TODO(roasbeef): could make the deltas relative, would save // space, but then tradeoff for more disk-seeks to recover the // full state. @@ -355,6 +386,7 @@ func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelDelta, error) // entails deleting all saved state within the database concerning this // channel, as well as created a small channel summary for record keeping // purposes. +// TODO(roasbeef): delete on-disk set of HTLC's func (c *OpenChannel) CloseChannel() error { return c.Db.store.Update(func(tx *bolt.Tx) error { // First fetch the top level bucket which stores all data related to @@ -415,15 +447,16 @@ type ChannelSnapshot struct { TotalSatoshisSent uint64 TotalSatoshisReceived uint64 - // TODO(roasbeef): fee stuff - updateNum uint64 - channel *OpenChannel + Htlcs []HTLC } // Snapshot returns a read-only snapshot of the current channel state. This // snapshot includes information concerning the current settled balance within // the channel, meta-data detailing total flows, and any outstanding HTLCs. func (c *OpenChannel) Snapshot() *ChannelSnapshot { + c.RLock() + defer c.RUnlock() + snapshot := &ChannelSnapshot{ ChannelPoint: c.ChanID, Capacity: c.Capacity, @@ -435,8 +468,12 @@ func (c *OpenChannel) Snapshot() *ChannelSnapshot { } copy(snapshot.RemoteID[:], c.TheirLNID[:]) - // TODO(roasbeef): cache current channel delta in memory, either merge - // or replace with ChannelSnapshot + // Copy over the current set of HTLC's to ensure the caller can't + // mutate our internal state. + snapshot.Htlcs = make([]HTLC, len(c.Htlcs)) + for i, h := range c.Htlcs { + snapshot.Htlcs[i] = h.Copy() + } return snapshot } @@ -491,12 +528,16 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := putChanFundingInfo(nodeChanBucket, channel); err != nil { return err } - if err := putChanEklremState(nodeChanBucket, channel); err != nil { + if err := putChanElkremState(nodeChanBucket, channel); err != nil { return err } if err := putChanDeliveryScripts(nodeChanBucket, channel); err != nil { return err } + if err := putCurrentHtlcs(nodeChanBucket, channel.Htlcs, + channel.ChanID); err != nil { + return err + } return nil } @@ -508,46 +549,51 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, chanID *wire.OutPoint) (*OpenChannel, error) { + var err error channel := &OpenChannel{ ChanID: chanID, } // First, read out the fields of the channel update less frequently. - if err := fetchChannelIDs(nodeChanBucket, channel); err != nil { + if err = fetchChannelIDs(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanCommitKeys(nodeChanBucket, channel); err != nil { + if err = fetchChanCommitKeys(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanCommitTxns(nodeChanBucket, channel); err != nil { + if err = fetchChanCommitTxns(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanFundingInfo(nodeChanBucket, channel); err != nil { + if err = fetchChanFundingInfo(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanEklremState(nodeChanBucket, channel); err != nil { + if err = fetchChanElkremState(nodeChanBucket, channel); err != nil { return nil, err } - if err := fetchChanDeliveryScripts(nodeChanBucket, channel); err != nil { + if err = fetchChanDeliveryScripts(nodeChanBucket, channel); err != nil { + return nil, err + } + channel.Htlcs, err = fetchCurrentHtlcs(nodeChanBucket, chanID) + if err != nil { return nil, err } // With the existence of an open channel bucket with this node verified, // perform a full read of the entire struct. Starting with the prefixed // fields residing in the parent bucket. - if err := fetchChanCapacity(openChanBucket, channel); err != nil { + if err = fetchChanCapacity(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanMinFeePerKb(openChanBucket, channel); err != nil { + if err = fetchChanMinFeePerKb(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanNumUpdates(openChanBucket, channel); err != nil { + if err = fetchChanNumUpdates(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanTotalFlow(openChanBucket, channel); err != nil { + if err = fetchChanTotalFlow(openChanBucket, channel); err != nil { return nil, err } - if err := fetchChanNetFee(openChanBucket, channel); err != nil { + if err = fetchChanNetFee(openChanBucket, channel); err != nil { return nil, err } @@ -589,7 +635,7 @@ func deleteOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket, if err := deleteChanFundingInfo(nodeChanBucket, channelID); err != nil { return err } - if err := deleteChanEklremState(nodeChanBucket, channelID); err != nil { + if err := deleteChanElkremState(nodeChanBucket, channelID); err != nil { return err } if err := deleteChanDeliveryScripts(nodeChanBucket, channelID); err != nil { @@ -1118,7 +1164,7 @@ func fetchChanFundingInfo(nodeChanBucket *bolt.Bucket, channel *OpenChannel) err return nil } -func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { +func putChanElkremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var bc bytes.Buffer if err := writeOutpoint(&bc, channel.ChanID); err != nil { return err @@ -1157,14 +1203,14 @@ func putChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error return nodeChanBucket.Put(elkremKey, b.Bytes()) } -func deleteChanEklremState(nodeChanBucket *bolt.Bucket, chanID []byte) error { +func deleteChanElkremState(nodeChanBucket *bolt.Bucket, chanID []byte) error { elkremKey := make([]byte, len(elkremStateKey)+len(chanID)) copy(elkremKey[:3], elkremStateKey) copy(elkremKey[3:], chanID) return nodeChanBucket.Delete(elkremKey) } -func fetchChanEklremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { +func fetchChanElkremState(nodeChanBucket *bolt.Bucket, channel *OpenChannel) error { var b bytes.Buffer if err := writeOutpoint(&b, channel.ChanID); err != nil { return err @@ -1286,7 +1332,7 @@ func serializeHTLC(w io.Writer, h *HTLC) error { n += copy(buf[n:], h.RHash[:]) byteOrder.PutUint32(buf[n:], h.RefundTimeout) n += 4 - byteOrder.PutUint32(buf[n:], h.RevocationTimeout) + byteOrder.PutUint32(buf[n:], h.RevocationDelay) n += 4 if _, err := w.Write(buf[:]); err != nil { @@ -1327,11 +1373,65 @@ func deserializeHTLC(r io.Reader) (*HTLC, error) { if _, err := r.Read(scratch[:4]); err != nil { return nil, err } - h.RevocationTimeout = byteOrder.Uint32(scratch[:]) + h.RevocationDelay = byteOrder.Uint32(scratch[:]) return h, nil } +func makeHtlcKey(o *wire.OutPoint) [39]byte { + var ( + n int + k [39]byte + ) + + // chk || txid || index + n += copy(k[:], currentHtlcKey) + n += copy(k[n:], o.Hash[:]) + var scratch [4]byte + byteOrder.PutUint32(scratch[:], o.Index) + copy(k[n:], scratch[:]) + + return k +} + +func putCurrentHtlcs(nodeChanBucket *bolt.Bucket, htlcs []*HTLC, + o *wire.OutPoint) error { + var b bytes.Buffer + + for _, htlc := range htlcs { + if err := serializeHTLC(&b, htlc); err != nil { + return err + } + } + + htlcKey := makeHtlcKey(o) + return nodeChanBucket.Put(htlcKey[:], b.Bytes()) +} + +func fetchCurrentHtlcs(nodeChanBucket *bolt.Bucket, + o *wire.OutPoint) ([]*HTLC, error) { + + htlcKey := makeHtlcKey(o) + htlcBytes := nodeChanBucket.Get(htlcKey[:]) + if htlcBytes == nil { + return nil, nil + } + + // TODO(roasbeef): can preallocate here + var htlcs []*HTLC + htlcReader := bytes.NewReader(htlcBytes) + for htlcReader.Len() != 0 { + htlc, err := deserializeHTLC(htlcReader) + if err != nil { + return nil, err + } + + htlcs = append(htlcs, htlc) + } + + return htlcs, nil +} + func serializeChannelDelta(w io.Writer, delta *ChannelDelta) error { // TODO(roasbeef): could use compression here to reduce on-disk space. var scratch [8]byte @@ -1401,39 +1501,41 @@ func deserializeChannelDelta(r io.Reader) (*ChannelDelta, error) { return delta, nil } +func makeLogKey(o *wire.OutPoint, updateNum uint32) [40]byte { + var ( + scratch [4]byte + n int + k [40]byte + ) + + n += copy(k[:], o.Hash[:]) + + byteOrder.PutUint32(scratch[:], o.Index) + copy(k[n:], scratch[:]) + n += 4 + + byteOrder.PutUint32(scratch[:], updateNum) + copy(k[n:], scratch[:]) + + return k +} + func appendChannelLogEntry(log *bolt.Bucket, delta *ChannelDelta, chanPoint *wire.OutPoint) error { - // First construct the key for this particular log entry. The key for - // each newly added log entry is: channelPoint || stateNum. - var logEntrykey [36 + 4]byte - copy(logEntrykey[:], chanPoint.Hash[:]) - var scratch [4]byte - byteOrder.PutUint32(scratch[:], delta.UpdateNum) - copy(logEntrykey[36:], scratch[:]) - - // With the key constructed, serialize the delta to raw bytes, then - // write the new state to disk. var b bytes.Buffer if err := serializeChannelDelta(&b, delta); err != nil { return err } + logEntrykey := makeLogKey(chanPoint, delta.UpdateNum) return log.Put(logEntrykey[:], b.Bytes()) } func fetchChannelLogEntry(log *bolt.Bucket, chanPoint *wire.OutPoint, updateNum uint32) (*ChannelDelta, error) { - // First construct the key for this particular log entry. The key for - // each newly added log entry is: channelPoint || stateNum. - // TODO(roasbeef): make into func.. - var logEntrykey [36 + 4]byte - copy(logEntrykey[:], chanPoint.Hash[:]) - var scratch [4]byte - byteOrder.PutUint32(scratch[:], updateNum) - copy(logEntrykey[36:], scratch[:]) - + logEntrykey := makeLogKey(chanPoint, updateNum) deltaBytes := log.Get(logEntrykey[:]) if deltaBytes == nil { return nil, fmt.Errorf("log entry not found") diff --git a/channeldb/channel_test.go b/channeldb/channel_test.go index 5f5e09f3..41419d71 100644 --- a/channeldb/channel_test.go +++ b/channeldb/channel_test.go @@ -171,10 +171,21 @@ func TestOpenChannelPutGetDelete(t *testing.T) { } defer cleanUp() + // Create the test channel state, then add an additional fake HTLC + // before syncing to disk. state, err := createTestChannelState(cdb) if err != nil { t.Fatalf("unable to create channel state: %v", err) } + state.Htlcs = []*HTLC{ + &HTLC{ + Incoming: true, + Amt: 10, + RHash: key, + RefundTimeout: 1, + RevocationDelay: 2, + }, + } if err := state.FullSync(); err != nil { t.Fatalf("unable to save and serialize channel state: %v", err) } @@ -302,6 +313,10 @@ func TestOpenChannelPutGetDelete(t *testing.T) { if !bytes.Equal(newState.TheirCurrentRevocationHash[:], state.TheirCurrentRevocationHash[:]) { t.Fatalf("revocation hashes don't match") } + if !reflect.DeepEqual(state.Htlcs[0], newState.Htlcs[0]) { + t.Fatalf("htlcs don't match: %v vs %v", spew.Sdump(state.Htlcs[0]), + spew.Sdump(newState.Htlcs[0])) + } // Finally to wrap up the test, delete the state of the channel within // the database. This involves "closing" the channel which removes all @@ -324,7 +339,7 @@ func TestOpenChannelPutGetDelete(t *testing.T) { } } -func TestChannelStateUpdateLog(t *testing.T) { +func TestChannelStateTransition(t *testing.T) { cdb, cleanUp, err := makeTestDB() if err != nil { t.Fatalf("uanble to make test database: %v", err) @@ -350,11 +365,11 @@ func TestChannelStateUpdateLog(t *testing.T) { incoming = true } htlc := &HTLC{ - Incoming: incoming, - Amt: 50000, - RHash: key, - RefundTimeout: i, - RevocationTimeout: i + 2, + Incoming: incoming, + Amt: 50000, + RHash: key, + RefundTimeout: i, + RevocationDelay: i + 2, } htlcs = append(htlcs, htlc) } @@ -372,13 +387,15 @@ func TestChannelStateUpdateLog(t *testing.T) { Htlcs: htlcs, UpdateNum: 1, } - if err := channel.RecordChannelDelta(newTx, newSig, delta); err != nil { - t.Fatalf("unable to record channel delta: %v", err) + + // First update the local node's broadcastable state. + if err := channel.UpdateCommitment(newTx, newSig, delta); err != nil { + t.Fatalf("unable to update commitment: %v", err) } - // The balances, new update, and the changes to the fake commitment - // transaction along with the modified signature should all have been - // updated. + // The balances, new update, the HTLC's and the changes to the fake + // commitment transaction along with the modified signature should all + // have been updated. nodeID := wire.ShaHash(channel.TheirLNID) updatedChannel, err := cdb.FetchOpenChannels(&nodeID) if err != nil { @@ -404,6 +421,25 @@ func TestChannelStateUpdateLog(t *testing.T) { t.Fatalf("update # doesn't match: %v vs %v", updatedChannel[0].NumUpdates, delta.UpdateNum) } + for i := 0; i < len(updatedChannel[0].Htlcs); i++ { + originalHTLC := updatedChannel[0].Htlcs[i] + diskHTLC := channel.Htlcs[i] + if !reflect.DeepEqual(originalHTLC, diskHTLC) { + t.Fatalf("htlc's dont match: %v vs %v", + spew.Sdump(originalHTLC), + spew.Sdump(diskHTLC)) + } + } + + // Next, write to the log which tracks the necessary revocation state + // needed to rectify any fishy behavior by the remote party. Modify the + // current uncollapsed revocation state to simulate a state transition + // by the remote party. + newRevocation := bytes.Repeat([]byte{9}, 32) + copy(channel.TheirCurrentRevocationHash[:], newRevocation) + if err := channel.AppendToRevocationLog(delta); err != nil { + t.Fatalf("unable to append to revocation log: %v", err) + } // We should be able to fetch the channel delta created above by it's // update number with all the state properly reconstructed. @@ -432,4 +468,13 @@ func TestChannelStateUpdateLog(t *testing.T) { spew.Sdump(diskHTLC)) } } + // The revocation state stored on-disk should now also be identical. + updatedChannel, err = cdb.FetchOpenChannels(&nodeID) + if err != nil { + t.Fatalf("unable to fetch updated channel: %v", err) + } + if !bytes.Equal(updatedChannel[0].TheirCurrentRevocationHash[:], + newRevocation) { + t.Fatalf("revocation state wasn't synced!") + } }