cosmos-sdk/pruning/manager.go

283 lines
8.7 KiB
Go
Raw Normal View History

package pruning
import (
"container/list"
"encoding/binary"
"fmt"
"sync"
"github.com/tendermint/tendermint/libs/log"
dbm "github.com/tendermint/tm-db"
"github.com/cosmos/cosmos-sdk/pruning/types"
)
// Manager is an abstraction to handle the logic needed for
// determinging when to prune old heights of the store
// based on the strategy described by the pruning options.
type Manager struct {
db dbm.DB
logger log.Logger
opts types.PruningOptions
snapshotInterval uint64
// Although pruneHeights happen in the same goroutine with the normal execution,
// we sync access to them to avoid soundness issues in the future if concurrency pattern changes.
pruneHeightsMx sync.Mutex
pruneHeights []int64
// Snapshots are taken in a separate goroutine from the regular execution
// and can be delivered asynchrounously via HandleHeightSnapshot.
// Therefore, we sync access to pruneSnapshotHeights with this mutex.
pruneSnapshotHeightsMx sync.Mutex
// These are the heights that are multiples of snapshotInterval and kept for state sync snapshots.
// The heights are added to this list to be pruned when a snapshot is complete.
pruneSnapshotHeights *list.List
}
// NegativeHeightsError is returned when a negative height is provided to the manager.
type NegativeHeightsError struct {
Height int64
}
var _ error = &NegativeHeightsError{}
func (e *NegativeHeightsError) Error() string {
return fmt.Sprintf("failed to get pruned heights: %d", e.Height)
}
var (
pruneHeightsKey = []byte("s/pruneheights")
pruneSnapshotHeightsKey = []byte("s/prunesnapshotheights")
)
// NewManager returns a new Manager with the given db and logger.
// The retuned manager uses a pruning strategy of "nothing" which
// keeps all heights. Users of the Manager may change the strategy
// by calling SetOptions.
func NewManager(db dbm.DB, logger log.Logger) *Manager {
return &Manager{
db: db,
logger: logger,
opts: types.NewPruningOptions(types.PruningNothing),
pruneHeights: []int64{},
pruneSnapshotHeights: list.New(),
}
}
// SetOptions sets the pruning strategy on the manager.
func (m *Manager) SetOptions(opts types.PruningOptions) {
m.opts = opts
}
// GetOptions fetches the pruning strategy from the manager.
func (m *Manager) GetOptions() types.PruningOptions {
return m.opts
}
// GetFlushAndResetPruningHeights returns all heights to be pruned during the next call to Prune().
// It also flushes and resets the pruning heights.
func (m *Manager) GetFlushAndResetPruningHeights() ([]int64, error) {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return []int64{}, nil
}
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()
// flush the updates to disk so that it is not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil {
return nil, err
}
// Return a copy to prevent data races.
pruningHeights := make([]int64, len(m.pruneHeights))
copy(pruningHeights, m.pruneHeights)
m.pruneHeights = m.pruneHeights[:0]
return pruningHeights, nil
}
// HandleHeight determines if previousHeight height needs to be kept for pruning at the right interval prescribed by
// the pruning strategy. Returns previousHeight, if it was kept to be pruned at the next call to Prune(), 0 otherwise.
// previousHeight must be greater than 0 for the handling to take effect since valid heights start at 1 and 0 represents
// the latest height. The latest height cannot be pruned. As a result, if previousHeight is less than or equal to 0, 0 is returned.
func (m *Manager) HandleHeight(previousHeight int64) int64 {
if m.opts.GetPruningStrategy() == types.PruningNothing || previousHeight <= 0 {
return 0
}
defer func() {
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()
m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()
// move persisted snapshot heights to pruneHeights which
// represent the heights to be pruned at the next pruning interval.
var next *list.Element
for e := m.pruneSnapshotHeights.Front(); e != nil; e = next {
snHeight := e.Value.(int64)
if snHeight < previousHeight-int64(m.opts.KeepRecent) {
m.pruneHeights = append(m.pruneHeights, snHeight)
// We must get next before removing to be able to continue iterating.
next = e.Next()
m.pruneSnapshotHeights.Remove(e)
} else {
next = e.Next()
}
}
// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneHeightsKey, int64SliceToBytes(m.pruneHeights)); err != nil {
panic(err)
}
}()
if int64(m.opts.KeepRecent) < previousHeight {
pruneHeight := previousHeight - int64(m.opts.KeepRecent)
// We consider this height to be pruned iff:
//
// - snapshotInterval is zero as that means that all heights should be pruned.
// - snapshotInterval % (height - KeepRecent) != 0 as that means the height is not
// a 'snapshot' height.
if m.snapshotInterval == 0 || pruneHeight%int64(m.snapshotInterval) != 0 {
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()
m.pruneHeights = append(m.pruneHeights, pruneHeight)
return pruneHeight
}
}
return 0
}
// HandleHeightSnapshot persists the snapshot height to be pruned at the next appropriate
// height defined by the pruning strategy. Flushes the update to disk and panics if the flush fails
// The input height must be greater than 0 and pruning strategy any but pruning nothing.
// If one of these conditions is not met, this function does nothing.
func (m *Manager) HandleHeightSnapshot(height int64) {
if m.opts.GetPruningStrategy() == types.PruningNothing || height <= 0 {
return
}
m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()
m.logger.Debug("HandleHeightSnapshot", "height", height)
m.pruneSnapshotHeights.PushBack(height)
// flush the updates to disk so that they are not lost if crash happens.
if err := m.db.SetSync(pruneSnapshotHeightsKey, listToBytes(m.pruneSnapshotHeights)); err != nil {
panic(err)
}
}
// SetSnapshotInterval sets the interval at which the snapshots are taken.
func (m *Manager) SetSnapshotInterval(snapshotInterval uint64) {
m.snapshotInterval = snapshotInterval
}
// ShouldPruneAtHeight return true if the given height should be pruned, false otherwise
func (m *Manager) ShouldPruneAtHeight(height int64) bool {
return m.opts.Interval > 0 && m.opts.GetPruningStrategy() != types.PruningNothing && height%int64(m.opts.Interval) == 0
}
// LoadPruningHeights loads the pruning heights from the database as a crash recovery.
func (m *Manager) LoadPruningHeights(db dbm.DB) error {
if m.opts.GetPruningStrategy() == types.PruningNothing {
return nil
}
loadedPruneHeights, err := loadPruningHeights(db)
if err != nil {
return err
}
if len(loadedPruneHeights) > 0 {
m.pruneHeightsMx.Lock()
defer m.pruneHeightsMx.Unlock()
m.pruneHeights = loadedPruneHeights
}
loadedPruneSnapshotHeights, err := loadPruningSnapshotHeights(db)
if err != nil {
return err
}
if loadedPruneSnapshotHeights.Len() > 0 {
m.pruneSnapshotHeightsMx.Lock()
defer m.pruneSnapshotHeightsMx.Unlock()
m.pruneSnapshotHeights = loadedPruneSnapshotHeights
}
return nil
}
func loadPruningHeights(db dbm.DB) ([]int64, error) {
bz, err := db.Get(pruneHeightsKey)
if err != nil {
return nil, fmt.Errorf("failed to get pruned heights: %w", err)
}
if len(bz) == 0 {
return []int64{}, nil
}
prunedHeights := make([]int64, len(bz)/8)
i, offset := 0, 0
for offset < len(bz) {
h := int64(binary.BigEndian.Uint64(bz[offset : offset+8]))
if h < 0 {
return []int64{}, &NegativeHeightsError{Height: h}
}
prunedHeights[i] = h
i++
offset += 8
}
return prunedHeights, nil
}
func loadPruningSnapshotHeights(db dbm.DB) (*list.List, error) {
bz, err := db.Get(pruneSnapshotHeightsKey)
if err != nil {
return nil, fmt.Errorf("failed to get post-snapshot pruned heights: %w", err)
}
pruneSnapshotHeights := list.New()
if len(bz) == 0 {
return pruneSnapshotHeights, nil
}
i, offset := 0, 0
for offset < len(bz) {
h := int64(binary.BigEndian.Uint64(bz[offset : offset+8]))
if h < 0 {
return nil, &NegativeHeightsError{Height: h}
}
pruneSnapshotHeights.PushBack(h)
i++
offset += 8
}
return pruneSnapshotHeights, nil
}
func int64SliceToBytes(slice []int64) []byte {
bz := make([]byte, 0, len(slice)*8)
for _, ph := range slice {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(ph))
bz = append(bz, buf...)
}
return bz
}
func listToBytes(list *list.List) []byte {
bz := make([]byte, 0, list.Len()*8)
for e := list.Front(); e != nil; e = e.Next() {
buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, uint64(e.Value.(int64)))
bz = append(bz, buf...)
}
return bz
}