channeldb: embed the instance of boltb within DB struct

This commit modifies the composition of the boltdb pointer within the
DB struct to use embedding.

The rationale for this change is that the daemon may soon store some
semi-transient items within the database which requires us to expose
the boltdb’s transaction API. The logic for serialization of this data
will likely lie outside of the channeldb package as the items that may
be stored in the future will be specific to the current sub-systems
within the daemon and not generic channel related data.
This commit is contained in:
Olaoluwa Osuntokun 2016-11-27 18:32:45 -08:00
parent 327768f4ad
commit 8312ce587a
No known key found for this signature in database
GPG Key ID: 9CC5B105D03521A2
6 changed files with 30 additions and 39 deletions

View File

@ -255,7 +255,7 @@ func (c *OpenChannel) FullSync() error {
c.Lock()
defer c.Unlock()
return c.Db.store.Update(c.fullSync)
return c.Db.Update(c.fullSync)
}
// fullSync is an internal versino of the FullSync method which allows callers
@ -310,7 +310,7 @@ func (c *OpenChannel) FullSyncWithAddr(addr *net.TCPAddr) error {
c.Lock()
defer c.Unlock()
return c.Db.store.Update(func(tx *bolt.Tx) error {
return c.Db.Update(func(tx *bolt.Tx) error {
// First, sync all the persistent channel state to disk.
if err := c.fullSync(tx); err != nil {
return err
@ -348,7 +348,7 @@ func (c *OpenChannel) UpdateCommitment(newCommitment *wire.MsgTx,
c.Lock()
defer c.Unlock()
return c.Db.store.Update(func(tx *bolt.Tx) error {
return c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
@ -417,8 +417,8 @@ type HTLC struct {
// the funds on-chain in the case of a unilateral channel closure.
RevocationDelay uint32
// OutputIndex is the vout output index for this particular HTLC output
// on the commitment transaction.
// OutputIndex is the output index for this particular HTLC output
// within the commitment transaction.
OutputIndex uint16
}
@ -453,7 +453,7 @@ type ChannelDelta struct {
// this log can be consulted in order to reconstruct the state needed to
// rectify the situation.
func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error {
return c.Db.store.Update(func(tx *bolt.Tx) error {
return c.Db.Update(func(tx *bolt.Tx) error {
chanBucket, err := tx.CreateBucketIfNotExists(openChannelBucket)
if err != nil {
return err
@ -495,7 +495,7 @@ func (c *OpenChannel) AppendToRevocationLog(delta *ChannelDelta) error {
func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelDelta, error) {
delta := &ChannelDelta{}
err := c.Db.store.View(func(tx *bolt.Tx) error {
err := c.Db.View(func(tx *bolt.Tx) error {
chanBucket := tx.Bucket(openChannelBucket)
nodePub := c.IdentityPub.SerializeCompressed()
@ -528,7 +528,7 @@ func (c *OpenChannel) FindPreviousState(updateNum uint64) (*ChannelDelta, error)
// purposes.
// TODO(roasbeef): delete on-disk set of HTLC's
func (c *OpenChannel) CloseChannel() error {
return c.Db.store.Update(func(tx *bolt.Tx) error {
return c.Db.Update(func(tx *bolt.Tx) error {
// First fetch the top level bucket which stores all data related to
// current, active channels.
chanBucket := tx.Bucket(openChannelBucket)
@ -682,8 +682,6 @@ func putOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
// fetchOpenChannel retrieves, and deserializes (including decrypting
// sensitive) the complete channel currently active with the passed nodeID.
// An EncryptorDecryptor is required to decrypt sensitive information stored
// within the database.
func fetchOpenChannel(openChanBucket *bolt.Bucket, nodeChanBucket *bolt.Bucket,
chanID *wire.OutPoint) (*OpenChannel, error) {

View File

@ -91,8 +91,7 @@ func makeTestDB() (*DB, func(), error) {
return nil, nil, err
}
// Next, create channeldb for the first time, also setting a mock
// EncryptorDecryptor implementation for testing purposes.
// Next, create channeldb for the first time.
cdb, err := Open(tempDirName, netParams)
if err != nil {
return nil, nil, err

View File

@ -51,17 +51,17 @@ var bufPool = &sync.Pool{
New: func() interface{} { return new(bytes.Buffer) },
}
// DB is the primary datastore for the LND daemon. The database stores
// DB is the primary datastore for the lnd daemon. The database stores
// information related to nodes, routing data, open/closed channels, fee
// schedules, and reputation data.
type DB struct {
store *bolt.DB
*bolt.DB
netParams *chaincfg.Params
dbPath string
}
// Open opens an existing channeldb created under the passed namespace with
// sensitive data encrypted by the passed EncryptorDecryptor implementation.
// Open opens an existing channeldb. Any necessary schemas migrations due to
// udpates will take plave as necessary.
func Open(dbPath string, netParams *chaincfg.Params) (*DB, error) {
path := filepath.Join(dbPath, dbName)
@ -77,7 +77,7 @@ func Open(dbPath string, netParams *chaincfg.Params) (*DB, error) {
}
chanDB := &DB{
store: bdb,
DB: bdb,
netParams: netParams,
dbPath: dbPath,
}
@ -95,7 +95,6 @@ func Open(dbPath string, netParams *chaincfg.Params) (*DB, error) {
// database. The deletion is done in a single transaction, therefore this
// operation is fully atomic.
func (d *DB) Wipe() error {
return d.store.Update(func(tx *bolt.Tx) error {
err := tx.DeleteBucket(openChannelBucket)
if err != nil && err != bolt.ErrBucketNotFound {
return err
@ -120,11 +119,6 @@ func (d *DB) Wipe() error {
})
}
// Close terminates the underlying database handle manually.
func (d *DB) Close() error {
return d.store.Close()
}
// createChannelDB creates and initializes a fresh version of channeldb. In
// the case that the target path has not yet been created or doesn't yet exist,
// then the path is created. Additionally, all required top-level buckets used
@ -189,7 +183,7 @@ func fileExists(path string) bool {
// returned.
func (d *DB) FetchOpenChannels(nodeID *btcec.PublicKey) ([]*OpenChannel, error) {
var channels []*OpenChannel
err := d.store.View(func(tx *bolt.Tx) error {
err := d.View(func(tx *bolt.Tx) error {
// Get the bucket dedicated to storing the meta-data for open
// channels.
openChanBucket := tx.Bucket(openChannelBucket)
@ -269,7 +263,7 @@ func (d *DB) fetchNodeChannels(openChanBucket,
func (d *DB) FetchAllChannels() ([]*OpenChannel, error) {
var channels []*OpenChannel
err := d.store.View(func(tx *bolt.Tx) error {
err := d.View(func(tx *bolt.Tx) error {
// Get the bucket dedicated to storing the meta-data for open
// channels.
openChanBucket := tx.Bucket(openChannelBucket)
@ -327,7 +321,7 @@ func (d *DB) syncVersions(versions []version) error {
// execute them serially within a single database transaction to ensure
// the migration is atomic.
migrations := getMigrationsToApply(versions, meta.DbVersionNumber)
return d.store.Update(func(tx *bolt.Tx) error {
return d.Update(func(tx *bolt.Tx) error {
for _, migration := range migrations {
if migration == nil {
continue

View File

@ -113,7 +113,7 @@ func (d *DB) AddInvoice(i *Invoice) error {
len(i.Receipt))
}
return d.store.Update(func(tx *bolt.Tx) error {
return d.Update(func(tx *bolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err
@ -157,7 +157,7 @@ func (d *DB) AddInvoice(i *Invoice) error {
// terms of the payment.
func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) {
var invoice *Invoice
err := d.store.View(func(tx *bolt.Tx) error {
err := d.View(func(tx *bolt.Tx) error {
invoices := tx.Bucket(invoiceBucket)
if invoices == nil {
return ErrInvoiceNotFound
@ -197,7 +197,7 @@ func (d *DB) LookupInvoice(paymentHash [32]byte) (*Invoice, error) {
func (d *DB) FetchAllInvoices(pendingOnly bool) ([]*Invoice, error) {
var invoices []*Invoice
err := d.store.View(func(tx *bolt.Tx) error {
err := d.View(func(tx *bolt.Tx) error {
invoiceB := tx.Bucket(invoiceBucket)
if invoiceB == nil {
return ErrNoInvoicesCreated
@ -238,7 +238,7 @@ func (d *DB) FetchAllInvoices(pendingOnly bool) ([]*Invoice, error) {
// hash doesn't existing within the database, then the action will fail with a
// "not found" error.
func (d *DB) SettleInvoice(paymentHash [32]byte) error {
return d.store.Update(func(tx *bolt.Tx) error {
return d.Update(func(tx *bolt.Tx) error {
invoices, err := tx.CreateBucketIfNotExists(invoiceBucket)
if err != nil {
return err

View File

@ -171,7 +171,7 @@ func TestMigrationWithPanic(t *testing.T) {
beforeMigrationFunc := func(d *DB) {
// Insert data in database and in order then make sure that the
// key isn't changes in case of panic or fail.
d.store.Update(func(tx *bolt.Tx) error {
d.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketPrefix)
if err != nil {
return err
@ -205,7 +205,7 @@ func TestMigrationWithPanic(t *testing.T) {
t.Fatal("migration paniced but version is changed")
}
err = d.store.Update(func(tx *bolt.Tx) error {
err = d.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketPrefix)
if err != nil {
return err
@ -238,7 +238,7 @@ func TestMigrationWithFatal(t *testing.T) {
afterMigration := []byte("aftermigration")
beforeMigrationFunc := func(d *DB) {
d.store.Update(func(tx *bolt.Tx) error {
d.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketPrefix)
if err != nil {
return err
@ -273,7 +273,7 @@ func TestMigrationWithFatal(t *testing.T) {
t.Fatal("migration failed but version is changed")
}
err = d.store.Update(func(tx *bolt.Tx) error {
err = d.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketPrefix)
if err != nil {
return err
@ -307,7 +307,7 @@ func TestMigrationWithoutErrors(t *testing.T) {
// Populate database with initial data.
beforeMigrationFunc := func(d *DB) {
d.store.Update(func(tx *bolt.Tx) error {
d.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketPrefix)
if err != nil {
return err
@ -341,7 +341,7 @@ func TestMigrationWithoutErrors(t *testing.T) {
"succesfully aplied migration")
}
err = d.store.Update(func(tx *bolt.Tx) error {
err = d.Update(func(tx *bolt.Tx) error {
bucket, err := tx.CreateBucketIfNotExists(bucketPrefix)
if err != nil {
return err

View File

@ -104,7 +104,7 @@ func (l *LinkNode) Sync() error {
// Finally update the database by storing the link node and updating
// any relevant indexes.
return l.db.store.Update(func(tx *bolt.Tx) error {
return l.db.Update(func(tx *bolt.Tx) error {
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeInfoBucket == nil {
return fmt.Errorf("node bucket not created")
@ -139,7 +139,7 @@ func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
err error
)
err = db.store.View(func(tx *bolt.Tx) error {
err = db.View(func(tx *bolt.Tx) error {
// First fetch the bucket for storing node meta-data, bailing
// out early if it hasn't been created yet.
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
@ -178,7 +178,7 @@ func (db *DB) FetchLinkNode(identity *btcec.PublicKey) (*LinkNode, error) {
func (db *DB) FetchAllLinkNodes() ([]*LinkNode, error) {
var linkNodes []*LinkNode
err := db.store.View(func(tx *bolt.Tx) error {
err := db.View(func(tx *bolt.Tx) error {
nodeMetaBucket := tx.Bucket(nodeInfoBucket)
if nodeInfoBucket == nil {
return fmt.Errorf("node bucket not created")