nursery_store: adds graduating state and proper finalization

This commit is contained in:
Conner Fromknecht 2017-10-11 16:33:12 -07:00
parent a8450875f6
commit f02f1355e7
No known key found for this signature in database
GPG Key ID: 39DE78FBE6ACB0EF
1 changed files with 193 additions and 64 deletions

View File

@ -52,13 +52,15 @@ type NurseryStore interface {
// TODO: make this handle one output at a time? // TODO: make this handle one output at a time?
AwardDiplomas(...kidOutput) ([]wire.OutPoint, error) AwardDiplomas(...kidOutput) ([]wire.OutPoint, error)
// FinalizeClass accepts a block height as a parameter and purges its IsMatureChannel(*wire.OutPoint) (bool, error)
// TryFinalizeClass accepts a block height as a parameter and purges its
// persistent state for all outputs at that height. During a restart, // persistent state for all outputs at that height. During a restart,
// the utxo nursery will begin it's recovery procedure from the next // the utxo nursery will begin it's recovery procedure from the next
// height that has yet to be finalized. This block height should lag // height that has yet to be finalized. This block height should lag
// beyond the best height for this chain as a measure of reorg // beyond the best height for this chain as a measure of reorg
// protection. // protection.
FinalizeClass(height uint32) error TryFinalizeClass(height uint32) error
// State Bucket Enumeration. // State Bucket Enumeration.
@ -83,6 +85,8 @@ type NurseryStore interface {
// whose type should be inferred from the key's prefix. // whose type should be inferred from the key's prefix.
ForChanOutputs(*wire.OutPoint, func([]byte, []byte) error) error ForChanOutputs(*wire.OutPoint, func([]byte, []byte) error) error
RemoveChannel(*wire.OutPoint) error
// The Point of No Return. // The Point of No Return.
// LastFinalizedHeight returns the last block height for which the // LastFinalizedHeight returns the last block height for which the
@ -164,6 +168,8 @@ var (
// and will be swept into the wallet after waiting out the relative // and will be swept into the wallet after waiting out the relative
// timelock. // timelock.
kndrPrefix = []byte("kndr") kndrPrefix = []byte("kndr")
gradPrefix = []byte("grad")
) )
// Overview of Nursery Store Storage Hierarchy // Overview of Nursery Store Storage Hierarchy
@ -563,13 +569,32 @@ func (ns *nurseryStore) AwardDiplomas(
// proceed to mark the channels as closed. // proceed to mark the channels as closed.
// TODO(conner): write list of closed channels to separate bucket so // TODO(conner): write list of closed channels to separate bucket so
// that they can be replayed on restart? // that they can be replayed on restart?
var closedChannelSet = make(map[wire.OutPoint]struct{}) var possibleCloseSet = make(map[wire.OutPoint]struct{})
if err := ns.db.Update(func(tx *bolt.Tx) error { if err := ns.db.Update(func(tx *bolt.Tx) error {
for _, kid := range kids { for _, kid := range kids {
confHeight := kid.ConfHeight()
outpoint := kid.OutPoint()
chanPoint := kid.OriginChanPoint()
// Remove output from kindergarten bucket.
pfxOutputKey, err := prefixOutputKey(kndrPrefix, outpoint)
if err != nil {
return err
}
hghtChanBucket := ns.getHeightChanBucket(tx, confHeight, chanPoint)
if hghtChanBucket != nil {
if err := hghtChanBucket.DeleteBucket(pfxOutputKey); err != nil {
return err
}
// Attempt to prune the height bucket matching the kid // Attempt to prune the height bucket matching the kid
// output's confirmation height if it contains no active // output's confirmation height if it contains no active
// outputs. // outputs.
err := ns.pruneHeight(tx, kid.ConfHeight()) err := ns.pruneHeight(tx, confHeight)
switch err { switch err {
case ErrBucketNotEmpty: case ErrBucketNotEmpty:
// Bucket still has active outputs, proceed to // Bucket still has active outputs, proceed to
@ -582,43 +607,33 @@ func (ns *nurseryStore) AwardDiplomas(
case nil: case nil:
// Bucket was pruned successfully and no errors // Bucket was pruned successfully and no errors
// were encounter. // were encounter.
utxnLog.Infof("Height bucket %d pruned", utxnLog.Infof("Height bucket %d pruned", confHeight)
kid.ConfHeight())
default: default:
// Unexpected database error. // Unexpected database error.
return err return err
} }
}
outpoint := kid.OutPoint() chanBucket, err := ns.createChannelBucket(tx, chanPoint)
chanPoint := kid.OriginChanPoint() if err != nil {
return err
}
// Remove the outpoint belonging to the kid output from if err := chanBucket.Delete(pfxOutputKey); err != nil {
// it's channel bucket, then attempt to prune the return err
// channel bucket if it is now empty. }
err = ns.deleteAndPruneChannel(tx, chanPoint, outpoint)
switch err {
case ErrBucketNotEmpty:
// Bucket still has active outputs, continue to
// next kid to avoid adding this channel point
// to the set of channels to be closed.
continue
case ErrBucketDoesNotExist: // Convert kindergarten key to graduate key.
// Bucket may have been removed previously, copy(pfxOutputKey, gradPrefix)
// allow this to fall through and ensure the
// channel point is added to the set channels to
// be closed.
case nil: var gradBuffer bytes.Buffer
// Channel bucket was successfully pruned, if err := kid.Encode(&gradBuffer); err != nil {
// proceed to add to set of channels to be return err
// closed. }
utxnLog.Infof("Height bucket %d pruned",
kid.ConfHeight())
default: err = chanBucket.Put(pfxOutputKey, gradBuffer.Bytes())
// Uh oh, database error. if err != nil {
return err return err
} }
@ -628,7 +643,7 @@ func (ns *nurseryStore) AwardDiplomas(
// to our set of closed channels to be closed, since // to our set of closed channels to be closed, since
// these may need to be replayed to ensure the channel // these may need to be replayed to ensure the channel
// database is aware that incubation has completed. // database is aware that incubation has completed.
closedChannelSet[*chanPoint] = struct{}{} possibleCloseSet[*chanPoint] = struct{}{}
} }
return nil return nil
@ -637,25 +652,31 @@ func (ns *nurseryStore) AwardDiplomas(
} }
// Convert our set of channels to be closed into a list. // Convert our set of channels to be closed into a list.
channelsToBeClosed := make([]wire.OutPoint, 0, len(closedChannelSet)) possibleCloses := make([]wire.OutPoint, 0, len(possibleCloseSet))
for chanPoint := range closedChannelSet { for chanPoint := range possibleCloseSet {
channelsToBeClosed = append(channelsToBeClosed, chanPoint) possibleCloses = append(possibleCloses, chanPoint)
} }
utxnLog.Infof("Channels to be marked fully closed: %x", utxnLog.Infof("Possible channels to be marked fully closed: %v",
channelsToBeClosed) possibleCloses)
return channelsToBeClosed, nil return possibleCloses, nil
} }
// FinalizeClass accepts a block height as a parameter and purges its // TryFinalizeClass accepts a block height as a parameter and purges its
// persistent state for all outputs at that height. During a restart, the utxo // persistent state for all outputs at that height. During a restart, the utxo
// nursery will begin it's recovery procedure from the next height that has // nursery will begin it's recovery procedure from the next height that has
// yet to be finalized. // yet to be finalized.
func (ns *nurseryStore) FinalizeClass(height uint32) error { func (ns *nurseryStore) TryFinalizeClass(height uint32) error {
utxnLog.Infof("Finalizing class at height %v", height)
return ns.db.Update(func(tx *bolt.Tx) error { return ns.db.Update(func(tx *bolt.Tx) error {
return ns.putLastFinalizedHeight(tx, height) utxnLog.Infof("Attempting to finalize class at height %v", height)
lastHeight, err := ns.getNextLastFinalizedHeight(tx, height)
if err != nil {
return err
}
utxnLog.Infof("Finalizing class at height %v", lastHeight)
return ns.putLastFinalizedHeight(tx, lastHeight)
}) })
} }
@ -697,9 +718,11 @@ func (ns *nurseryStore) FetchCribs(height uint32) ([]babyOutput, error) {
func (ns *nurseryStore) FetchKindergartens(height uint32) ([]kidOutput, error) { func (ns *nurseryStore) FetchKindergartens(height uint32) ([]kidOutput, error) {
// Construct a list of all kidOutputs that mature at the provided block // Construct a list of all kidOutputs that mature at the provided block
// height. // height.
utxnLog.Infof("Fetching kinders")
var kids []kidOutput var kids []kidOutput
if err := ns.forEachHeightPrefix(kndrPrefix, height, if err := ns.forEachHeightPrefix(kndrPrefix, height,
func(buf []byte) error { func(buf []byte) error {
utxnLog.Infof("Inside kinder")
// We will attempt to deserialize all outputs stored // We will attempt to deserialize all outputs stored
// with the kindergarten prefix into kidOutputs, since // with the kindergarten prefix into kidOutputs, since
@ -719,6 +742,7 @@ func (ns *nurseryStore) FetchKindergartens(height uint32) ([]kidOutput, error) {
}); err != nil { }); err != nil {
return nil, err return nil, err
} }
utxnLog.Infof("Returning kinders")
return kids, nil return kids, nil
} }
@ -816,13 +840,49 @@ func (ns *nurseryStore) ForChanOutputs(chanPoint *wire.OutPoint,
callback func([]byte, []byte) error) error { callback func([]byte, []byte) error) error {
return ns.db.View(func(tx *bolt.Tx) error { return ns.db.View(func(tx *bolt.Tx) error {
return ns.forChanOutputs(tx, chanPoint, callback)
})
}
func (ns *nurseryStore) forChanOutputs(tx *bolt.Tx, chanPoint *wire.OutPoint,
callback func([]byte, []byte) error) error {
chanBucket := ns.getChannelBucket(tx, chanPoint) chanBucket := ns.getChannelBucket(tx, chanPoint)
if chanBucket == nil { if chanBucket == nil {
return ErrContractNotFound return ErrContractNotFound
} }
return chanBucket.ForEach(callback) return chanBucket.ForEach(callback)
}) }
func (ns *nurseryStore) IsMatureChannel(chanPoint *wire.OutPoint) (bool, error) {
var isMature bool
if err := ns.db.View(func(tx *bolt.Tx) error {
var nOutputs, nGrads int
if err := ns.forChanOutputs(tx, chanPoint, func(pfxKey, _ []byte) error {
// Count total and number of grad outputs
if string(pfxKey[:4]) == string(gradPrefix) {
nGrads++
}
nOutputs++
return nil
}); err != nil {
return err
}
utxnLog.Infof("Found %d graduated outputs out of %d", nGrads, nOutputs)
// Channel is mature if all outputs are graduated.
if nGrads == nOutputs {
isMature = true
}
return nil
}); err != nil {
return false, err
}
return isMature, nil
} }
// The Point of No Return. // The Point of No Return.
@ -867,6 +927,49 @@ func (ns *nurseryStore) getLastFinalizedHeight(tx *bolt.Tx) (uint32, error) {
return byteOrder.Uint32(heightBytes), nil return byteOrder.Uint32(heightBytes), nil
} }
func (ns *nurseryStore) getNextLastFinalizedHeight(tx *bolt.Tx, height uint32) (uint32, error) {
// Retrieve the previous last finalized height.
lastHeight, err := ns.getLastFinalizedHeight(tx)
if err != nil {
return 0, err
}
// TODO(conner): add lower bound after which all state is purged.
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
if chainBucket == nil {
return lastHeight, nil
}
// Retrieve the existing height index.
hghtIndex := chainBucket.Bucket(heightIndexKey)
if hghtIndex == nil {
return lastHeight, nil
}
for curHeight := lastHeight + 1; curHeight <= height; curHeight++ {
var curHeightBytes [4]byte
byteOrder.PutUint32(curHeightBytes[:], curHeight)
hghtBucket := hghtIndex.Bucket(curHeightBytes[:])
if hghtBucket == nil {
continue
}
nChildren, err := ns.numChildrenInBucket(hghtBucket)
if err != nil {
return 0, err
}
if nChildren > 0 {
return curHeight - 1, nil
}
}
return height, nil
}
// pubLastFinalizedHeight is a helper method that writes the provided height // pubLastFinalizedHeight is a helper method that writes the provided height
// under the last finalized height key. // under the last finalized height key.
func (ns *nurseryStore) putLastFinalizedHeight(tx *bolt.Tx, func (ns *nurseryStore) putLastFinalizedHeight(tx *bolt.Tx,
@ -1135,6 +1238,7 @@ func (ns *nurseryStore) forEachHeightPrefix(prefix []byte, height uint32,
// the appropriate channel bucket. // the appropriate channel bucket.
outputBytes := chanBucket.Get(pfxOutputKey) outputBytes := chanBucket.Get(pfxOutputKey)
if outputBytes == nil { if outputBytes == nil {
pfxOutputKey, _ = c.Next()
continue continue
} }
@ -1228,6 +1332,31 @@ func (ns *nurseryStore) deleteAndPruneChannel(tx *bolt.Tx,
return ns.removeBucketIfEmpty(chanIndex, chanBytes) return ns.removeBucketIfEmpty(chanIndex, chanBytes)
} }
func (ns *nurseryStore) RemoveChannel(chanPoint *wire.OutPoint) error {
return ns.db.Update(func(tx *bolt.Tx) error {
// Retrieve the existing chain bucket for this nursery store.
chainBucket := tx.Bucket(ns.pfxChainKey)
if chainBucket == nil {
return nil
}
// Retrieve the channel index stored in the chain bucket.
chanIndex := chainBucket.Bucket(channelIndexKey)
if chanIndex == nil {
return nil
}
// Serialize the provided channel point, such that we can delete
// the mature channel bucket.
var chanBuffer bytes.Buffer
if err := writeOutpoint(&chanBuffer, chanPoint); err != nil {
return err
}
return chanIndex.DeleteBucket(chanBuffer.Bytes())
})
}
// pruneHeight // pruneHeight
// NOTE: This method returns two concrete errors apart from those returned by // NOTE: This method returns two concrete errors apart from those returned by
// the underlying database: ErrBucketDoesNotExist and ErrBucketNotEmpty. These // the underlying database: ErrBucketDoesNotExist and ErrBucketNotEmpty. These