channeldb: add storage of an on-disk directed channel graph

This commit introduces a new capability to the database: storage of an
on-disk directed channel graph. The on-disk representation of the graph
within boltdb is essentially a modified adjacency list which separates
the storage of the edge’s existence and the storage of the edge
information itself.

The new objects provided within he ChannelGraph carry an API which
facilitates easy graph traversal via their ForEach* methods. As a
result, path finding algorithms will be able to be expressed in a
natural way using the range methods as a for-range language extension
within Go.

Additionally caching will likely be added either at this layer or the
layer above (the RoutingManager) in order keep queries and outgoing
payments speedy. In a future commit a new set of RPC’s to query the
state of a particular edge or node will also be added.
This commit is contained in:
Olaoluwa Osuntokun 2016-12-07 22:47:01 -08:00
parent 06347664fc
commit e39dc9eec1
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
4 changed files with 1653 additions and 1 deletions

View File

@ -116,6 +116,15 @@ func (d *DB) Wipe() error {
return err return err
} }
err = tx.DeleteBucket(nodeBucket)
if err != nil && err != bolt.ErrBucketNotFound {
return err
}
err = tx.DeleteBucket(edgeBucket)
if err != nil && err != bolt.ErrBucketNotFound {
return err
}
return nil return nil
}) })
} }
@ -154,6 +163,13 @@ func createChannelDB(dbPath string) error {
return err return err
} }
if _, err := tx.CreateBucket(nodeBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(edgeBucket); err != nil {
return err
}
if _, err := tx.CreateBucket(metaBucket); err != nil { if _, err := tx.CreateBucket(metaBucket); err != nil {
return err return err
} }
@ -349,6 +365,11 @@ func (d *DB) syncVersions(versions []version) error {
}) })
} }
// ChannelGraph returns a new instance of the directed channel graph.
func (d *DB) ChannelGraph() *ChannelGraph {
return &ChannelGraph{d}
}
func getLatestDBVersion(versions []version) uint32 { func getLatestDBVersion(versions []version) uint32 {
return versions[len(versions)-1].number return versions[len(versions)-1].number
} }

View File

@ -14,5 +14,16 @@ var (
ErrDuplicateInvoice = fmt.Errorf("invoice with payment hash already exists") ErrDuplicateInvoice = fmt.Errorf("invoice with payment hash already exists")
ErrNodeNotFound = fmt.Errorf("link node with target identity not found") ErrNodeNotFound = fmt.Errorf("link node with target identity not found")
ErrMetaNotFound = fmt.Errorf("unable to locate meta information") ErrMetaNotFound = fmt.Errorf("unable to locate meta information")
ErrGraphNotFound = fmt.Errorf("graph bucket not initialized")
ErrGraphNodesNotFound = fmt.Errorf("no graph nodes exist")
ErrGraphNoEdgesFound = fmt.Errorf("no graph edges exist")
ErrGraphNodeNotFound = fmt.Errorf("unable to find node")
ErrEdgeNotFound = fmt.Errorf("edge for chanID not found")
ErrNodeAliasNotFound = fmt.Errorf("alias for node not found")
ErrSourceNodeNotSet = fmt.Errorf("source node does not exist")
) )

1113
channeldb/graph.go Normal file

File diff suppressed because it is too large Load Diff

507
channeldb/graph_test.go Normal file
View File

@ -0,0 +1,507 @@
package channeldb
import (
"bytes"
"fmt"
"image/color"
prand "math/rand"
"net"
"reflect"
"testing"
"time"
"github.com/btcsuite/fastsha256"
"github.com/roasbeef/btcd/btcec"
"github.com/roasbeef/btcd/wire"
"github.com/roasbeef/btcutil"
)
var (
testAddr, _ = net.ResolveTCPAddr("tcp", "10.0.0.1:9000")
)
func createTestVertex(db *DB) (*LightningNode, error) {
updateTime := prand.Int63()
priv, err := btcec.NewPrivateKey(btcec.S256())
if err != nil {
return nil, err
}
pub := priv.PubKey().SerializeCompressed()
return &LightningNode{
LastUpdate: time.Unix(updateTime, 0),
Address: testAddr,
PubKey: priv.PubKey(),
Color: color.RGBA{1, 2, 3, 0},
Alias: "kek" + string(pub[:]),
db: db,
}, nil
}
func TestNodeInsertionAndDeletion(t *testing.T) {
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test basic insertion/deletion for vertexes from the
// graph, so we'll create a test vertex to start with.
_, testPub := btcec.PrivKeyFromBytes(btcec.S256(), key[:])
node := &LightningNode{
LastUpdate: time.Unix(1232342, 0),
Address: testAddr,
PubKey: testPub,
Color: color.RGBA{1, 2, 3, 0},
Alias: "kek",
db: db,
}
// First, insert the node into the graph DB. This should succeed
// without any errors.
if err := graph.AddLightningNode(node); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Next, fetch the node from the database to ensure everything was
// serialized properly.
dbNode, err := graph.FetchLightningNode(testPub)
if err != nil {
t.Fatalf("unable to locate node: %v", err)
}
// The two nodes should match exactly!
if !reflect.DeepEqual(node, dbNode) {
t.Fatalf("retrieved node doesn't match: expected %#v\n, got %#v\n",
node, dbNode)
}
// Next, delete the node from the graph, this should purge all data
// related to the node.
if err := graph.DeleteLightningNode(testPub); err != nil {
t.Fatalf("unable to delete node; %v", err)
}
// Finally, attempt to fetch the node again. This should fail as the
// node should've been deleted from the database.
_, err = graph.FetchLightningNode(testPub)
if err != ErrGraphNodeNotFound {
t.Fatalf("fetch after delete should fail!")
}
}
func TestAliasLookup(t *testing.T) {
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test the alias index within the database, so first
// create a new test node.
testNode, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
// Add the node to the graph's database, this should also insert an
// entry into the alias index for this node.
if err := graph.AddLightningNode(testNode); err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Next, attempt to lookup the alias. The alias should exactly match
// the one which the test node was assigned.
dbAlias, err := graph.LookupAlias(testNode.PubKey)
if err != nil {
t.Fatalf("unable to find alias: %v", err)
}
if dbAlias != testNode.Alias {
t.Fatalf("aliases don't match, expected %v got %v",
testNode.Alias, dbAlias)
}
// Ensure that looking up a non-existent alias results in an error.
node, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
_, err = graph.LookupAlias(node.PubKey)
if err != ErrNodeAliasNotFound {
t.Fatalf("alias lookup should fail for non-existent pubkey")
}
}
func TestSourceNode(t *testing.T) {
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test the setting/getting of the source node, so we
// first create a fake node to use within the test.
testNode, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
// Attempt to fetch the source node, this should return an error as the
// source node hasn't yet been set.
if _, err := graph.SourceNode(); err != ErrSourceNodeNotSet {
t.Fatalf("source node shouldn't be set in new graph")
}
// Set the source the source node, this should insert the node into the
// database in a special way indicating it's the source node.
if err := graph.SetSourceNode(testNode); err != nil {
t.Fatalf("unable to set source node: %v", err)
}
// Retrieve the source node from the database, it should exactly match
// the one we set above.
sourceNode, err := graph.SourceNode()
if err != nil {
t.Fatalf("unable to fetch source node: %v", err)
}
if !reflect.DeepEqual(testNode, sourceNode) {
t.Fatalf("nodes don't match, expected %#v \n got %#v",
testNode, sourceNode)
}
}
func TestEdgeInsertionDeletion(t *testing.T) {
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test the insertion/deletion of edges, so we create two
// vertexes to connect.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
// In in addition to the fake vertexes we create some fake channel
// identifiers.
chanID := uint64(prand.Int63())
outpoint := wire.OutPoint{
Hash: rev,
Index: 9,
}
// Add the new edge to the database, this should proceed without any
// errors.
if err := graph.AddChannelEdge(node1.PubKey, node2.PubKey, &outpoint,
chanID); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}
// Check for existence of the edge within the database, it should be
// found.
found, err := graph.HasChannelEdge(chanID)
if err != nil {
t.Fatalf("unable to query for edge: %v", err)
} else if !found {
t.Fatalf("graph should have of inserted edge")
}
// Next, attempt to delete the edge from the database, again this
// should proceed without any issues.
if err := graph.DeleteChannelEdge(&outpoint); err != nil {
t.Fatalf("unable to delete edge: %v", err)
}
// Finally, attempt to delete a (now) non-existent edge within the
// database, this should result in an error.
err = graph.DeleteChannelEdge(&outpoint)
if err != ErrEdgeNotFound {
t.Fatalf("deleting a non-existent edge should fail!")
}
}
func TestEdgeInfoUpdates(t *testing.T) {
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test the update of edges inserted into the database, so
// we create two vertexes to connect.
node1, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node1); err != nil {
t.Fatalf("unable to add node: %v", err)
}
node2, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create test node: %v", err)
}
if err := graph.AddLightningNode(node2); err != nil {
t.Fatalf("unable to add node: %v", err)
}
var (
firstNode *LightningNode
secondNode *LightningNode
)
node1Bytes := node1.PubKey.SerializeCompressed()
node2Bytes := node2.PubKey.SerializeCompressed()
if bytes.Compare(node1Bytes, node2Bytes) == -1 {
firstNode = node1
secondNode = node2
} else {
firstNode = node2
secondNode = node1
}
// In in addition to the fake vertexes we create some fake channel
// identifiers.
chanID := uint64(prand.Int63())
outpoint := wire.OutPoint{
Hash: rev,
Index: 9,
}
// Add the new edge to the database, this should proceed without any
// errors.
if err := graph.AddChannelEdge(node1.PubKey, node2.PubKey, &outpoint,
chanID); err != nil {
t.Fatalf("unable to create channel edge: %v", err)
}
// With the edge added, we can now create some fake edge information to
// update for both edges.
edge1 := &ChannelEdge{
ChannelID: chanID,
ChannelPoint: outpoint,
LastUpdate: time.Unix(433453, 0),
Flags: 0,
Expiry: 99,
MinHTLC: 2342135,
FeeBaseMSat: 4352345,
FeeProportionalMillionths: 3452352,
Capacity: 9903453,
Node: secondNode,
db: db,
}
edge2 := &ChannelEdge{
ChannelID: chanID,
ChannelPoint: outpoint,
LastUpdate: time.Unix(124234, 0),
Flags: 1,
Expiry: 99,
MinHTLC: 2342135,
FeeBaseMSat: 4352345,
FeeProportionalMillionths: 90392423,
Capacity: 324523,
Node: firstNode,
db: db,
}
// Next, insert both nodes into the database, they should both be
// inserted without any issues.
if err := graph.UpdateEdgeInfo(edge1); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
if err := graph.UpdateEdgeInfo(edge2); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
// With the edges inserted, perform some queries to ensure that they've
// been inserted properly.
dbEdge1, dbEdge2, err := graph.FetchChannelEdgesByID(chanID)
if err != nil {
t.Fatalf("unable to fetch channel by ID: %v", err)
}
if !reflect.DeepEqual(dbEdge1, edge1) {
t.Fatalf("edge doesn't match: expected %#v, \n got %#v", edge1,
dbEdge1)
}
if !reflect.DeepEqual(dbEdge2, edge2) {
t.Fatalf("edge doesn't match: expected %#v, \n got %#v", edge2,
dbEdge2)
}
// Next, attempt to query the channel edges according to the outpoint
// of the channel.
dbEdge1, dbEdge2, err = graph.FetchChannelEdgesByOutpoint(&outpoint)
if err != nil {
t.Fatalf("unable to fetch channel by ID: %v", err)
}
if !reflect.DeepEqual(dbEdge1, edge1) {
t.Fatalf("edge doesn't match: expected %#v, \n got %#v", edge1,
dbEdge1)
}
if !reflect.DeepEqual(dbEdge2, edge2) {
t.Fatalf("edge doesn't match: expected %#v, \n got %#v", edge2,
dbEdge2)
}
}
func randEdge(chanID uint64, op wire.OutPoint, db *DB) *ChannelEdge {
update := prand.Int63()
return &ChannelEdge{
ChannelID: chanID,
ChannelPoint: op,
LastUpdate: time.Unix(update, 0),
Expiry: uint16(prand.Int63()),
MinHTLC: btcutil.Amount(prand.Int63()),
FeeBaseMSat: btcutil.Amount(prand.Int63()),
FeeProportionalMillionths: btcutil.Amount(prand.Int63()),
Capacity: btcutil.Amount(prand.Int63()),
db: db,
}
}
func TestGraphTraversal(t *testing.T) {
db, cleanUp, err := makeTestDB()
if err != nil {
t.Fatalf("unable to make test database: %v", err)
}
defer cleanUp()
graph := db.ChannelGraph()
// We'd like to test some of the graph traversal capabilities within
// the DB, so we'll create a series of fake nodes to insert into the
// graph.
const numNodes = 20
nodes := make([]*LightningNode, numNodes)
nodeIndex := map[string]struct{}{}
for i := 0; i < numNodes; i++ {
node, err := createTestVertex(db)
if err != nil {
t.Fatalf("unable to create node: %v", err)
}
nodes[i] = node
nodeIndex[node.Alias] = struct{}{}
}
// Add each of the nodes into the graph, they should be inserted
// without error.
for _, node := range nodes {
if err := graph.AddLightningNode(node); err != nil {
t.Fatalf("unable to add node: %v", err)
}
}
// Iterate over each node as returned by the graph, if all nodes are
// reached, then the map created above should be empty.
err = graph.ForEachNode(func(node *LightningNode) error {
delete(nodeIndex, node.Alias)
return nil
})
if err != nil {
t.Fatalf("for each failure: %v", err)
}
if len(nodeIndex) != 0 {
t.Fatalf("all nodes not reached within ForEach")
}
// Determine which node is "smaller", we'll need this in order to
// properly create the edges for the graph.
var firstNode, secondNode *LightningNode
node1Bytes := nodes[0].PubKey.SerializeCompressed()
node2Bytes := nodes[1].PubKey.SerializeCompressed()
if bytes.Compare(node1Bytes, node2Bytes) == -1 {
firstNode = nodes[0]
secondNode = nodes[1]
} else {
firstNode = nodes[0]
secondNode = nodes[1]
}
// Create 5 channels between the first two nodes we generated above.
const numChannels = 5
chanIndex := map[uint64]struct{}{}
for i := 0; i < numChannels; i++ {
txHash := fastsha256.Sum256([]byte{byte(i)})
chanID := uint64(i + 1)
op := wire.OutPoint{
Hash: txHash,
Index: 0,
}
err := graph.AddChannelEdge(nodes[0].PubKey, nodes[1].PubKey,
&op, chanID)
if err != nil {
t.Fatalf("unable to add node: %v", err)
}
// Create and add an edge with random data that points from
// node1 -> node2.
edge := randEdge(chanID, op, db)
edge.Flags = 0
edge.Node = secondNode
if err := graph.UpdateEdgeInfo(edge); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
// Create another random edge that points from node2 -> node1
// this time.
edge = randEdge(chanID, op, db)
edge.Flags = 1
edge.Node = firstNode
if err := graph.UpdateEdgeInfo(edge); err != nil {
t.Fatalf("unable to update edge: %v", err)
}
chanIndex[chanID] = struct{}{}
}
// Iterate through all the known channels within the graph DB, once
// again if the map is empty that that indicates that all edges have
// properly been reached.
err = graph.ForEachChannel(func(_, e *ChannelEdge) error {
delete(chanIndex, e.ChannelID)
return nil
})
if err != nil {
t.Fatalf("for each failure: %v", err)
}
if len(chanIndex) != 0 {
t.Fatalf("all edges not reached within ForEach")
}
// Finally, we want to test the ability to iterate over all the
// outgoing channels for a particular node.
numNodeChans := 0
err = firstNode.ForEachChannel(func(c *ChannelEdge) error {
// Each each should indicate that it's outgoing (pointed
// towards the second node).
if !c.Node.PubKey.IsEqual(secondNode.PubKey) {
return fmt.Errorf("wrong outgoing edge")
}
numNodeChans += 1
return nil
})
if err != nil {
t.Fatalf("for each failure: %v", err)
}
if numNodeChans != numChannels {
t.Fatalf("all edges for node reached within ForEach")
}
}