385 lines
11 KiB
Go
385 lines
11 KiB
Go
// Copyright (c) 2019-2020 The Zcash developers
|
|
// Distributed under the MIT software license, see the accompanying
|
|
// file COPYING or https://www.opensource.org/licenses/mit-license.php .
|
|
package common
|
|
|
|
import (
|
|
"bytes"
|
|
"encoding/binary"
|
|
"hash/fnv"
|
|
"io"
|
|
"io/ioutil"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
|
|
"github.com/golang/protobuf/proto"
|
|
"github.com/zcash/lightwalletd/walletrpc"
|
|
)
|
|
|
|
type blockCacheEntry struct {
|
|
data []byte
|
|
}
|
|
|
|
// BlockCache contains a consecutive set of recent compact blocks in marshalled form.
|
|
type BlockCache struct {
|
|
lengthsName, blocksName string // pathnames
|
|
lengthsFile, blocksFile *os.File
|
|
starts []int64 // Starting offset of each block within blocksFile
|
|
firstBlock int // height of the first block in the cache (usually Sapling activation)
|
|
nextBlock int // height of the first block not in the cache
|
|
latestHash []byte // hash of the most recent (highest height) block, for detecting reorgs.
|
|
mutex sync.RWMutex
|
|
}
|
|
|
|
func (c *BlockCache) GetNextHeight() int {
|
|
c.mutex.RLock()
|
|
defer c.mutex.RUnlock()
|
|
return c.nextBlock
|
|
}
|
|
|
|
func (c *BlockCache) GetLatestHash() []byte {
|
|
c.mutex.RLock()
|
|
defer c.mutex.RUnlock()
|
|
return c.latestHash
|
|
}
|
|
|
|
// HashMismatch indicates if the given prev-hash doesn't match the most recent block's hash
|
|
// so reorgs can be detected.
|
|
func (c *BlockCache) HashMismatch(prevhash []byte) bool {
|
|
c.mutex.RLock()
|
|
defer c.mutex.RUnlock()
|
|
return c.latestHash != nil && !bytes.Equal(c.latestHash, prevhash)
|
|
}
|
|
|
|
// Make the block at the given height the lowest height that we don't have.
|
|
// In other words, wipe out this height and beyond.
|
|
// This should never increase the size of the cache, only decrease.
|
|
// Caller should hold c.mutex.Lock().
|
|
func (c *BlockCache) setDbFiles(height int) {
|
|
if height <= c.nextBlock {
|
|
if height < c.firstBlock {
|
|
height = c.firstBlock
|
|
}
|
|
index := height - c.firstBlock
|
|
if err := c.lengthsFile.Truncate(int64(index * 4)); err != nil {
|
|
Log.Fatal("truncate lengths file failed: ", err)
|
|
}
|
|
if err := c.blocksFile.Truncate(c.starts[index]); err != nil {
|
|
Log.Fatal("truncate blocks file failed: ", err)
|
|
}
|
|
c.Sync()
|
|
c.starts = c.starts[:index+1]
|
|
c.nextBlock = height
|
|
c.setLatestHash()
|
|
}
|
|
}
|
|
|
|
func copyFile(src, dst string) error {
|
|
in, err := os.Open(src)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer in.Close()
|
|
|
|
out, err := os.Create(dst)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer out.Close()
|
|
|
|
_, err = io.Copy(out, in)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
return out.Close()
|
|
}
|
|
|
|
// Caller should hold c.mutex.Lock().
|
|
func (c *BlockCache) recoverFromCorruption(height int) {
|
|
Log.Warning("CORRUPTION detected in db blocks-cache files, height ", height, " redownloading")
|
|
|
|
// Save the corrupted files for post-mortem analysis.
|
|
save := c.lengthsName + "-corrupted"
|
|
if err := copyFile(c.lengthsName, save); err != nil {
|
|
Log.Warning("Could not copy db lengths file: ", err)
|
|
}
|
|
save = c.blocksName + "-corrupted"
|
|
if err := copyFile(c.blocksName, save); err != nil {
|
|
Log.Warning("Could not copy db lengths file: ", err)
|
|
}
|
|
|
|
c.setDbFiles(height)
|
|
}
|
|
|
|
// not including the checksum
|
|
func (c *BlockCache) blockLength(height int) int {
|
|
index := height - c.firstBlock
|
|
return int(c.starts[index+1] - c.starts[index] - 8)
|
|
}
|
|
|
|
// Calculate the 8-byte checksum that precedes each block in the blocks file.
|
|
func checksum(height int, b []byte) []byte {
|
|
h := make([]byte, 8)
|
|
binary.LittleEndian.PutUint64(h, uint64(height))
|
|
cs := fnv.New64a()
|
|
cs.Write(h)
|
|
cs.Write(b)
|
|
return cs.Sum(nil)
|
|
}
|
|
|
|
// Caller should hold (at least) c.mutex.RLock().
|
|
func (c *BlockCache) readBlock(height int) *walletrpc.CompactBlock {
|
|
blockLen := c.blockLength(height)
|
|
b := make([]byte, blockLen+8)
|
|
offset := c.starts[height-c.firstBlock]
|
|
n, err := c.blocksFile.ReadAt(b, offset)
|
|
if err != nil || n != len(b) {
|
|
Log.Warning("blocks read offset: ", offset, " failed: ", n, err)
|
|
return nil
|
|
}
|
|
diskcs := b[:8]
|
|
b = b[8 : blockLen+8]
|
|
if !bytes.Equal(checksum(height, b), diskcs) {
|
|
Log.Warning("bad block checksum at height: ", height, " offset: ", offset)
|
|
return nil
|
|
}
|
|
block := &walletrpc.CompactBlock{}
|
|
err = proto.Unmarshal(b, block)
|
|
if err != nil {
|
|
// Could be file corruption.
|
|
Log.Warning("blocks unmarshal at offset: ", offset, " failed: ", err)
|
|
return nil
|
|
}
|
|
if int(block.Height) != height {
|
|
// Could be file corruption.
|
|
Log.Warning("block unexpected height at height ", height, " offset: ", offset)
|
|
return nil
|
|
}
|
|
return block
|
|
}
|
|
|
|
// Caller should hold c.mutex.Lock().
|
|
func (c *BlockCache) setLatestHash() {
|
|
c.latestHash = nil
|
|
// There is at least one block; get the last block's hash
|
|
if c.nextBlock > c.firstBlock {
|
|
// At least one block remains; get the last block's hash
|
|
block := c.readBlock(c.nextBlock - 1)
|
|
if block == nil {
|
|
c.recoverFromCorruption(c.nextBlock - 10000)
|
|
return
|
|
}
|
|
c.latestHash = make([]byte, len(block.Hash))
|
|
copy(c.latestHash, block.Hash)
|
|
}
|
|
}
|
|
|
|
// NewBlockCache returns an instance of a block cache object.
|
|
// (No locking here, we assume this is single-threaded.)
|
|
func NewBlockCache(dbPath string, chainName string, startHeight int, redownload bool) *BlockCache {
|
|
c := &BlockCache{}
|
|
c.firstBlock = startHeight
|
|
c.nextBlock = startHeight
|
|
c.lengthsName, c.blocksName = dbFileNames(dbPath, chainName)
|
|
var err error
|
|
if err := os.MkdirAll(filepath.Join(dbPath, chainName), 0755); err != nil {
|
|
Log.Fatal("mkdir ", dbPath, " failed: ", err)
|
|
}
|
|
c.blocksFile, err = os.OpenFile(c.blocksName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
Log.Fatal("open ", c.blocksName, " failed: ", err)
|
|
}
|
|
c.lengthsFile, err = os.OpenFile(c.lengthsName, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0644)
|
|
if err != nil {
|
|
Log.Fatal("open ", c.lengthsName, " failed: ", err)
|
|
}
|
|
if redownload {
|
|
if err := c.lengthsFile.Truncate(0); err != nil {
|
|
Log.Fatal("truncate lengths file failed: ", err)
|
|
}
|
|
if err := c.blocksFile.Truncate(0); err != nil {
|
|
Log.Fatal("truncate blocks file failed: ", err)
|
|
}
|
|
}
|
|
lengths, err := ioutil.ReadFile(c.lengthsName)
|
|
if err != nil {
|
|
Log.Fatal("read ", c.lengthsName, " failed: ", err)
|
|
}
|
|
|
|
// The last entry in starts[] is where to write the next block.
|
|
var offset int64
|
|
c.starts = nil
|
|
c.starts = append(c.starts, 0)
|
|
for i := 0; i < len(lengths)/4; i++ {
|
|
if len(lengths[:4]) < 4 {
|
|
Log.Warning("lengths file has a partial entry")
|
|
c.recoverFromCorruption(c.nextBlock)
|
|
break
|
|
}
|
|
length := binary.LittleEndian.Uint32(lengths[i*4 : (i+1)*4])
|
|
if length < 78 || length > 4*1000*1000 {
|
|
Log.Warning("lengths file has impossible value ", length)
|
|
c.recoverFromCorruption(c.nextBlock)
|
|
break
|
|
}
|
|
offset += int64(length) + 8
|
|
c.starts = append(c.starts, offset)
|
|
// Check for corruption.
|
|
block := c.readBlock(c.nextBlock)
|
|
if block == nil {
|
|
Log.Warning("error reading block")
|
|
c.recoverFromCorruption(c.nextBlock)
|
|
break
|
|
}
|
|
c.nextBlock++
|
|
}
|
|
c.setDbFiles(c.nextBlock)
|
|
Log.Info("Found ", c.nextBlock-c.firstBlock, " blocks in cache")
|
|
return c
|
|
}
|
|
|
|
func dbFileNames(dbPath string, chainName string) (string, string) {
|
|
return filepath.Join(dbPath, chainName, "lengths"),
|
|
filepath.Join(dbPath, chainName, "blocks")
|
|
}
|
|
|
|
// Add adds the given block to the cache at the given height, returning true
|
|
// if a reorg was detected.
|
|
func (c *BlockCache) Add(height int, block *walletrpc.CompactBlock) error {
|
|
// Invariant: m[firstBlock..nextBlock) are valid.
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
if height > c.nextBlock {
|
|
// Cache has been reset (for example, checksum error)
|
|
return nil
|
|
}
|
|
if height < c.firstBlock {
|
|
// Should never try to add a block before Sapling activation height
|
|
Log.Fatal("cache.Add height below Sapling: ", height)
|
|
return nil
|
|
}
|
|
if height < c.nextBlock {
|
|
// Should never try to "backup" (call Reorg() instead).
|
|
Log.Fatal("cache.Add height going backwards: ", height)
|
|
return nil
|
|
}
|
|
bheight := int(block.Height)
|
|
|
|
// TODO COINBASE-HEIGHT: restore this check after coinbase height is fixed
|
|
if false && bheight != height {
|
|
// This could only happen if zcashd returned the wrong
|
|
// block (not the height we requested).
|
|
Log.Fatal("cache.Add wrong height: ", bheight, " expecting: ", height)
|
|
return nil
|
|
}
|
|
|
|
// Add the new block and its length to the db files.
|
|
data, err := proto.Marshal(block)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
_, err = c.blocksFile.Write(append(checksum(height, data), data...))
|
|
if err != nil {
|
|
Log.Fatal("blocks write failed: ", err)
|
|
}
|
|
b := make([]byte, 4)
|
|
binary.LittleEndian.PutUint32(b, uint32(len(data)))
|
|
_, err = c.lengthsFile.Write(b)
|
|
if err != nil {
|
|
Log.Fatal("lengths write failed: ", err)
|
|
}
|
|
|
|
// update the in-memory variables
|
|
offset := c.starts[len(c.starts)-1]
|
|
c.starts = append(c.starts, offset+int64(len(data)+8))
|
|
|
|
if c.latestHash == nil {
|
|
c.latestHash = make([]byte, len(block.Hash))
|
|
}
|
|
copy(c.latestHash, block.Hash)
|
|
c.nextBlock++
|
|
// Invariant: m[firstBlock..nextBlock) are valid.
|
|
return nil
|
|
}
|
|
|
|
// Reorg resets nextBlock (the block that should be Add()ed next)
|
|
// downward to the given height.
|
|
func (c *BlockCache) Reorg(height int) {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
// Allow the caller not to have to worry about Sapling start height.
|
|
if height < c.firstBlock {
|
|
height = c.firstBlock
|
|
}
|
|
if height >= c.nextBlock {
|
|
// Timing window, ignore this request
|
|
return
|
|
}
|
|
// Remove the end of the cache.
|
|
c.nextBlock = height
|
|
newCacheLen := height - c.firstBlock
|
|
c.starts = c.starts[:newCacheLen+1]
|
|
|
|
if err := c.lengthsFile.Truncate(int64(4 * newCacheLen)); err != nil {
|
|
Log.Fatal("truncate failed: ", err)
|
|
}
|
|
if err := c.blocksFile.Truncate(c.starts[newCacheLen]); err != nil {
|
|
Log.Fatal("truncate failed: ", err)
|
|
}
|
|
c.setLatestHash()
|
|
}
|
|
|
|
// Get returns the compact block at the requested height if it's
|
|
// in the cache, else nil.
|
|
func (c *BlockCache) Get(height int) *walletrpc.CompactBlock {
|
|
c.mutex.RLock()
|
|
defer c.mutex.RUnlock()
|
|
|
|
if height < c.firstBlock || height >= c.nextBlock {
|
|
return nil
|
|
}
|
|
block := c.readBlock(height)
|
|
if block == nil {
|
|
go func() {
|
|
// We hold only the read lock, need the exclusive lock.
|
|
c.mutex.Lock()
|
|
c.recoverFromCorruption(height - 10000)
|
|
c.mutex.Unlock()
|
|
}()
|
|
return nil
|
|
}
|
|
return block
|
|
}
|
|
|
|
// GetLatestHeight returns the height of the most recent block, or -1
|
|
// if the cache is empty.
|
|
func (c *BlockCache) GetLatestHeight() int {
|
|
c.mutex.RLock()
|
|
defer c.mutex.RUnlock()
|
|
if c.firstBlock == c.nextBlock {
|
|
return -1
|
|
}
|
|
return c.nextBlock - 1
|
|
}
|
|
|
|
func (c *BlockCache) Sync() {
|
|
c.lengthsFile.Sync()
|
|
c.blocksFile.Sync()
|
|
}
|
|
|
|
// Currently used only for testing.
|
|
func (c *BlockCache) Close() {
|
|
// Some operating system require you to close files before you can remove them.
|
|
if c.lengthsFile != nil {
|
|
c.lengthsFile.Close()
|
|
c.lengthsFile = nil
|
|
}
|
|
if c.blocksFile != nil {
|
|
c.blocksFile.Close()
|
|
c.blocksFile = nil
|
|
}
|
|
}
|