From 385818307bd8fc31d0c59c8c7ef63a6825008488 Mon Sep 17 00:00:00 2001 From: Olaoluwa Osuntokun Date: Thu, 9 Nov 2017 20:57:09 -0800 Subject: [PATCH] channeldb: update FetchOpenChannel to traverse new chain bucket --- channeldb/db.go | 150 +++++++++++++++++++++++++++++------------------- 1 file changed, 91 insertions(+), 59 deletions(-) diff --git a/channeldb/db.go b/channeldb/db.go index 2f5d7bc5..ea2edce2 100644 --- a/channeldb/db.go +++ b/channeldb/db.go @@ -9,7 +9,6 @@ import ( "sync" "github.com/boltdb/bolt" - "github.com/lightningnetwork/lnd/lnwire" "github.com/roasbeef/btcd/btcec" "github.com/roasbeef/btcd/wire" ) @@ -223,65 +222,80 @@ func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error) return nil } - // Within this top level bucket, fetch the bucket dedicated to storing - // open channel data specific to the remote node. + // Within this top level bucket, fetch the bucket dedicated to + // storing open channel data specific to the remote node. pub := nodeID.SerializeCompressed() nodeChanBucket := openChanBucket.Bucket(pub) if nodeChanBucket == nil { return nil } - // Finally, we both of the necessary buckets retrieved, fetch - // all the active channels related to this node. - nodeChannels, err := d.fetchNodeChannels(openChanBucket, - nodeChanBucket) - if err != nil { - return fmt.Errorf("unable to read channel for "+ - "node_key=%x: %v", pub, err) - } + // Next, we'll need to go down an additional layer in order to + // retrieve the channels for each chain the node knows of. + return nodeChanBucket.ForEach(func(chainHash, v []byte) error { + // If there's a value, it's not a bucket so ignore it. + if v != nil { + return nil + } - channels = nodeChannels - return nil + // If we've found a valid chainhash bucket, then we'll + // retrieve that so we can extract all the channels. + chainBucket := nodeChanBucket.Bucket(chainHash) + if chainBucket == nil { + return fmt.Errorf("unable to read bucket for "+ + "chain=%x", chainHash[:]) + } + + // Finally, we both of the necessary buckets retrieved, + // fetch all the active channels related to this node. + nodeChannels, err := d.fetchNodeChannels(chainBucket) + if err != nil { + return fmt.Errorf("unable to read channel for "+ + "chain_hash=%x, node_key=%x: %v", + chainHash[:], pub, err) + } + + channels = nodeChannels + return nil + }) }) return channels, err } -// fetchNodeChannels retrieves all active channels from the target -// nodeChanBucket. This function is typically used to fetch all the active -// channels related to a particular node. -func (d *DB) fetchNodeChannels(openChanBucket, - nodeChanBucket *bolt.Bucket) ([]*OpenChannel, error) { +// fetchNodeChannels retrieves all active channels from the target chainBucket +// which is under a node's dedicated channel bucket. This function is typically +// used to fetch all the active channels related to a particular node. +func (d *DB) fetchNodeChannels(chainBucket *bolt.Bucket) ([]*OpenChannel, error) { var channels []*OpenChannel - // Once we have the node's channel bucket, iterate through each - // item in the inner chan ID bucket. This bucket acts as an - // index for all channels we currently have open with this node. - nodeChanIDBucket := nodeChanBucket.Bucket(chanIDBucket[:]) - if nodeChanIDBucket == nil { - return nil, nil - } - err := nodeChanIDBucket.ForEach(func(k, v []byte) error { - if k == nil { + // A node may have channels on several chains, so for each known chain, + // we'll extract all the channels. + err := chainBucket.ForEach(func(chanPoint, v []byte) error { + // If there's a value, it's not a bucket so ignore it. + if v != nil { return nil } - outBytes := bytes.NewReader(k) - chanID := &wire.OutPoint{} - if err := readOutpoint(outBytes, chanID); err != nil { + // Once we've found a valid channel bucket, we'll extract it + // from the node's chain bucket. + chanBucket := chainBucket.Bucket(chanPoint) + + var outPoint wire.OutPoint + err := readOutpoint(bytes.NewReader(chanPoint), &outPoint) + if err != nil { return err } - - oChannel, err := fetchOpenChannel(openChanBucket, - nodeChanBucket, chanID) + oChannel, err := fetchOpenChannel(chanBucket, &outPoint) if err != nil { return fmt.Errorf("unable to read channel data for "+ - "chan_point=%v: %v", chanID, err) + "chan_point=%v: %v", outPoint, err) } oChannel.Db = d channels = append(channels, oChannel) + return nil }) if err != nil { @@ -297,17 +311,17 @@ func (d *DB) FetchAllChannels() ([]*OpenChannel, error) { return fetchChannels(d, false) } -// FetchPendingChannels will return channels that have completed the process -// of generating and broadcasting funding transactions, but whose funding +// FetchPendingChannels will return channels that have completed the process of +// generating and broadcasting funding transactions, but whose funding // transactions have yet to be confirmed on the blockchain. func (d *DB) FetchPendingChannels() ([]*OpenChannel, error) { return fetchChannels(d, true) } // fetchChannels attempts to retrieve channels currently stored in the -// database. The pendingOnly parameter determines whether only pending -// channels will be returned. If no active channels exist within the network, -// then ErrNoActiveChannels is returned. +// database. The pendingOnly parameter determines whether only pending channels +// will be returned. If no active channels exist within the network, then +// ErrNoActiveChannels is returned. func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) { var channels []*OpenChannel @@ -319,39 +333,57 @@ func fetchChannels(d *DB, pendingOnly bool) ([]*OpenChannel, error) { return ErrNoActiveChannels } - // Next, fetch the bucket dedicated to storing metadata - // related to all nodes. All keys within this bucket are the - // serialized public keys of all our direct counterparties. + // Next, fetch the bucket dedicated to storing metadata related + // to all nodes. All keys within this bucket are the serialized + // public keys of all our direct counterparties. nodeMetaBucket := tx.Bucket(nodeInfoBucket) if nodeMetaBucket == nil { return fmt.Errorf("node bucket not created") } // Finally for each node public key in the bucket, fetch all - // the channels related to this particualr ndoe. + // the channels related to this particular node. return nodeMetaBucket.ForEach(func(k, v []byte) error { nodeChanBucket := openChanBucket.Bucket(k) if nodeChanBucket == nil { return nil } - nodeChannels, err := d.fetchNodeChannels(openChanBucket, - nodeChanBucket) - if err != nil { - return fmt.Errorf("unable to read channel for "+ - "node_key=%x: %v", k, err) - } - // TODO(roasbeef): simplify - if pendingOnly { - for _, channel := range nodeChannels { - if channel.IsPending { - channels = append(channels, channel) - } + return nodeChanBucket.ForEach(func(chainHash, v []byte) error { + // If there's a value, it's not a bucket so + // ignore it. + if v != nil { + return nil } - } else { - channels = append(channels, nodeChannels...) - } - return nil + + // If we've found a valid chainhash bucket, + // then we'll retrieve that so we can extract + // all the channels. + chainBucket := nodeChanBucket.Bucket(chainHash) + if chainBucket == nil { + return fmt.Errorf("unable to read "+ + "bucket for chain=%x", chainHash[:]) + } + + nodeChans, err := d.fetchNodeChannels(chainBucket) + if err != nil { + return fmt.Errorf("unable to read "+ + "channel for chain_hash=%x, "+ + "node_key=%x: %v", chainHash[:], k, err) + } + // TODO(roasbeef): simplify + if pendingOnly { + for _, channel := range nodeChans { + if channel.IsPending { + channels = append(channels, channel) + } + } + } else { + channels = append(channels, nodeChans...) + } + return nil + }) + }) })