improve reorg by using getbestblockhash

This commit is contained in:
Larry Ruane 2021-08-10 23:29:48 -06:00 committed by Larry Ruane
parent b1f3687d83
commit bdaac63f3e
5 changed files with 297 additions and 106 deletions

View File

@ -51,12 +51,12 @@ func (c *BlockCache) GetLatestHash() []byte {
return c.latestHash return c.latestHash
} }
// HashMismatch indicates if the given prev-hash doesn't match the most recent block's hash // HashMatch indicates if the given prev-hash matches the most recent block's hash
// so reorgs can be detected. // so reorgs can be detected.
func (c *BlockCache) HashMismatch(prevhash []byte) bool { func (c *BlockCache) HashMatch(prevhash []byte) bool {
c.mutex.RLock() c.mutex.RLock()
defer c.mutex.RUnlock() defer c.mutex.RUnlock()
return c.latestHash != nil && !bytes.Equal(c.latestHash, prevhash) return c.latestHash == nil || bytes.Equal(c.latestHash, prevhash)
} }
// Make the block at the given height the lowest height that we don't have. // Make the block at the given height the lowest height that we don't have.

View File

@ -226,7 +226,7 @@ func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) {
params := make([]json.RawMessage, 2) params := make([]json.RawMessage, 2)
heightJSON, err := json.Marshal(strconv.Itoa(height)) heightJSON, err := json.Marshal(strconv.Itoa(height))
if err != nil { if err != nil {
return nil, errors.Wrap(err, "error marshaling height") Log.Fatal("getBlockFromRPC bad height argument", height, err)
} }
params[0] = heightJSON params[0] = heightJSON
params[1] = json.RawMessage("0") // non-verbose (raw hex) params[1] = json.RawMessage("0") // non-verbose (raw hex)
@ -289,11 +289,8 @@ func stopIngestor() {
// BlockIngestor runs as a goroutine and polls zcashd for new blocks, adding them // BlockIngestor runs as a goroutine and polls zcashd for new blocks, adding them
// to the cache. The repetition count, rep, is nonzero only for unit-testing. // to the cache. The repetition count, rep, is nonzero only for unit-testing.
func BlockIngestor(c *BlockCache, rep int) { func BlockIngestor(c *BlockCache, rep int) {
lastLog := time.Now() lastLog := Time.Now()
reorgCount := 0
lastHeightLogged := 0 lastHeightLogged := 0
retryCount := 0
wait := true
// Start listening for new blocks // Start listening for new blocks
for i := 0; rep == 0 || i < rep; i++ { for i := 0; rep == 0 || i < rep; i++ {
@ -304,88 +301,60 @@ func BlockIngestor(c *BlockCache, rep int) {
default: default:
} }
height := c.GetNextHeight() result, err := RawRequest("getbestblockhash", []json.RawMessage{})
block, err := getBlockFromRPC(height)
if err != nil { if err != nil {
Log.WithFields(logrus.Fields{ Log.WithFields(logrus.Fields{
"height": height, "error": err,
"error": err, }).Fatal("error zcashd getbestblockhash rpc")
}).Warn("error zcashd getblock rpc") }
retryCount++ var hashHex string
if retryCount > 10 { err = json.Unmarshal(result, &hashHex)
Log.WithFields(logrus.Fields{ if err != nil {
"timeouts": retryCount, Log.Fatal("bad getbestblockhash return:", err, result)
}).Fatal("unable to issue RPC call to zcashd node") }
} lastBestBlockHash := []byte{}
// Delay then retry the same height. lastBestBlockHash, err = hex.DecodeString(hashHex)
if err != nil {
Log.Fatal("error decoding getbestblockhash", err, hashHex)
}
height := c.GetNextHeight()
if string(lastBestBlockHash) == string(parser.Reverse(c.GetLatestHash())) {
// Synced
c.Sync() c.Sync()
Time.Sleep(10 * time.Second) if lastHeightLogged != height-1 {
wait = true lastHeightLogged = height - 1
Log.Info("Waiting for block: ", height)
}
Time.Sleep(2 * time.Second)
lastLog = Time.Now()
continue continue
} }
retryCount = 0 var block *walletrpc.CompactBlock
if block == nil { block, err = getBlockFromRPC(height)
// No block at this height. if err != nil {
if height == c.GetFirstHeight() { Log.Fatal("getblock failed, will retry", err)
Log.Info("Waiting for zcashd height to reach Sapling activation height ",
"(", c.GetFirstHeight(), ")...")
reorgCount = 0
Time.Sleep(20 * time.Second)
continue
}
if wait {
// Wait a bit then retry the same height.
c.Sync()
if lastHeightLogged+1 != height {
Log.Info("Ingestor waiting for block: ", height)
lastHeightLogged = height - 1
}
Time.Sleep(2 * time.Second)
wait = false
continue
}
} }
if block == nil || c.HashMismatch(block.PrevHash) { if block != nil && c.HashMatch(block.PrevHash) {
// This may not be a reorg; it may be we're at the tip if err = c.Add(height, block); err != nil {
// and there's no new block yet, but we want to back up Log.Fatal("Cache add failed:", err)
// so we detect a reorg in which the new chain is the
// same length or shorter.
reorgCount++
if reorgCount > 100 {
Log.Fatal("Reorg exceeded max of 100 blocks! Help!")
} }
// Print the hash of the block that is getting reorg-ed away // Don't log these too often.
// as 'phash', not the prevhash of the block we just received. if DarksideEnabled || Time.Now().Sub(lastLog).Seconds() >= 4 {
if block != nil { lastLog = Time.Now()
Log.WithFields(logrus.Fields{ Log.Info("Adding block to cache ", height, " ", displayHash(block.Hash))
"height": height,
"hash": displayHash(block.Hash),
"phash": displayHash(c.GetLatestHash()),
"reorg": reorgCount,
}).Warn("REORG")
} else if reorgCount > 1 {
Log.WithFields(logrus.Fields{
"height": height,
"phash": displayHash(c.GetLatestHash()),
"reorg": reorgCount,
}).Warn("REORG")
} }
// Try backing up
c.Reorg(height - 1)
continue continue
} }
// We have a valid block to add. if height == c.GetFirstHeight() {
wait = true c.Sync()
reorgCount = 0 Log.Info("Waiting for zcashd height to reach Sapling activation height ",
if err := c.Add(height, block); err != nil { "(", c.GetFirstHeight(), ")...")
Log.Fatal("Cache add failed:", err) Time.Sleep(20 * time.Second)
} return
// Don't log these too often.
if time.Now().Sub(lastLog).Seconds() >= 4 && c.GetNextHeight() == height+1 && height != lastHeightLogged {
lastLog = time.Now()
lastHeightLogged = height
Log.Info("Ingestor adding block to cache: ", height)
} }
Log.Info("REORG: dropping block ", height-1, " ", displayHash(c.GetLatestHash()))
c.Reorg(height - 1)
} }
} }

View File

@ -34,6 +34,8 @@ var (
logger = logrus.New() logger = logrus.New()
blocks [][]byte // four test blocks blocks [][]byte // four test blocks
testcache *BlockCache
) )
// TestMain does common setup that's shared across multiple tests // TestMain does common setup that's shared across multiple tests
@ -60,6 +62,7 @@ func TestMain(m *testing.M) {
blockJSON, _ := json.Marshal(scan.Text()) blockJSON, _ := json.Marshal(scan.Text())
blocks = append(blocks, blockJSON) blocks = append(blocks, blockJSON)
} }
testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, true)
// Setup is done; run all tests. // Setup is done; run all tests.
exitcode := m.Run() exitcode := m.Run()
@ -160,8 +163,217 @@ func TestGetLightdInfo(t *testing.T) {
// ------------------------------------------ BlockIngestor() // ------------------------------------------ BlockIngestor()
func checkSleepMethod(count int, duration time.Duration, expected string, method string) {
if sleepCount != count {
testT.Fatal("unexpected sleep count")
}
if sleepDuration != duration*time.Second {
testT.Fatal("unexpected sleep duration")
}
if method != expected {
testT.Error("unexpected method")
}
}
// There are four test blocks, 0..3 // There are four test blocks, 0..3
func blockIngestorStub(method string, params []json.RawMessage) (json.RawMessage, error) {
step++
// request the first two blocks very quickly (syncing),
// then next block isn't yet available
switch step {
case 1:
checkSleepMethod(0, 0, "getbestblockhash", method)
// This hash doesn't matter, won't match anything
r, _ := json.Marshal("010101")
return r, nil
case 2:
checkSleepMethod(0, 0, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380640" {
testT.Fatal("incorrect height requested")
}
// height 380640
return blocks[0], nil
case 3:
checkSleepMethod(0, 0, "getbestblockhash", method)
// This hash doesn't matter, won't match anything
r, _ := json.Marshal("010101")
return r, nil
case 4:
checkSleepMethod(0, 0, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380641" {
testT.Fatal("incorrect height requested")
}
// height 380641
return blocks[1], nil
case 5:
// Return the expected block hash, so we're synced, should
// then sleep for 2 seconds, then another getbestblockhash
checkSleepMethod(0, 0, "getbestblockhash", method)
r, _ := json.Marshal(displayHash(testcache.GetLatestHash()))
return r, nil
case 6:
// Simulate still no new block, still synced, should
// sleep for 2 seconds, then another getbestblockhash
checkSleepMethod(1, 2, "getbestblockhash", method)
r, _ := json.Marshal(displayHash(testcache.GetLatestHash()))
return r, nil
case 7:
// Simulate new block (any non-matching hash will do)
checkSleepMethod(2, 4, "getbestblockhash", method)
r, _ := json.Marshal("aabb")
return r, nil
case 8:
checkSleepMethod(2, 4, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380642" {
testT.Fatal("incorrect height requested")
}
// height 380642
return blocks[2], nil
case 9:
// Simulate still no new block, still synced, should
// sleep for 2 seconds, then another getbestblockhash
checkSleepMethod(2, 4, "getbestblockhash", method)
r, _ := json.Marshal(displayHash(testcache.GetLatestHash()))
return r, nil
case 10:
// There are 3 blocks in the cache (380640-642), so let's
// simulate a 1-block reorg, new version (replacement) of 380642
checkSleepMethod(3, 6, "getbestblockhash", method)
// hash doesn't matter, just something that doesn't match
r, _ := json.Marshal("4545")
return r, nil
case 11:
// It thinks there may simply be a new block, but we'll say
// there is no block at this height (380642 was replaced).
checkSleepMethod(3, 6, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380643" {
testT.Fatal("incorrect height requested")
}
return nil, errors.New("-8: Block height out of range")
case 12:
// It will re-ask the best hash (let's make no change)
checkSleepMethod(3, 6, "getbestblockhash", method)
// hash doesn't matter, just something that doesn't match
r, _ := json.Marshal("4545")
return r, nil
case 13:
// It should have backed up one block
checkSleepMethod(3, 6, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380642" {
testT.Fatal("incorrect height requested")
}
// height 380642
return blocks[2], nil
case 14:
// We're back to the same state as case 9, and this time
// we'll make it back up 2 blocks (rather than one)
checkSleepMethod(3, 6, "getbestblockhash", method) // XXXXXXXXXXXXXXXXXXXXXXXXXXXXX XXX
// hash doesn't matter, just something that doesn't match
r, _ := json.Marshal("5656")
return r, nil
case 15:
// It thinks there may simply be a new block, but we'll say
// there is no block at this height (380642 was replaced).
checkSleepMethod(3, 6, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380643" {
testT.Fatal("incorrect height requested")
}
return nil, errors.New("-8: Block height out of range")
case 16:
checkSleepMethod(3, 6, "getbestblockhash", method)
// hash doesn't matter, just something that doesn't match
r, _ := json.Marshal("5656")
return r, nil
case 17:
// Like case 13, it should have backed up one block, but
// this time we'll make it back up one more
checkSleepMethod(3, 6, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380642" {
testT.Fatal("incorrect height requested")
}
return nil, errors.New("-8: Block height out of range")
case 18:
checkSleepMethod(3, 6, "getbestblockhash", method)
// hash doesn't matter, just something that doesn't match
r, _ := json.Marshal("5656")
return r, nil
case 19:
// It should have backed up one more
checkSleepMethod(3, 6, "getblock", method)
var height string
err := json.Unmarshal(params[0], &height)
if err != nil {
testT.Fatal("could not unmarshal height")
}
if height != "380641" {
testT.Fatal("incorrect height requested")
}
return blocks[1], nil
}
testT.Error("blockIngestorStub called too many times")
return nil, nil
}
func TestBlockIngestor(t *testing.T) {
testT = t
RawRequest = blockIngestorStub
Time.Sleep = sleepStub
Time.Now = nowStub
os.RemoveAll(unitTestPath)
testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, false)
BlockIngestor(testcache, 11)
if step != 19 {
t.Error("unexpected final step", step)
}
step = 0
sleepCount = 0
sleepDuration = 0
os.RemoveAll(unitTestPath)
}
// ------------------------------------------ GetBlockRange()
// There are four test blocks, 0..3
// (probably don't need all these cases)
func getblockStub(method string, params []json.RawMessage) (json.RawMessage, error) { func getblockStub(method string, params []json.RawMessage) (json.RawMessage, error) {
if method != "getblock" {
testT.Error("unexpected method")
}
var height string var height string
err := json.Unmarshal(params[0], &height) err := json.Unmarshal(params[0], &height)
if err != nil { if err != nil {
@ -272,27 +484,11 @@ func getblockStub(method string, params []json.RawMessage) (json.RawMessage, err
return nil, nil return nil, nil
} }
func TestBlockIngestor(t *testing.T) {
testT = t
RawRequest = getblockStub
Time.Sleep = sleepStub
os.RemoveAll(unitTestPath)
testcache := NewBlockCache(unitTestPath, unitTestChain, 380640, false)
BlockIngestor(testcache, 11)
if step != 11 {
t.Error("unexpected final step", step)
}
step = 0
sleepCount = 0
sleepDuration = 0
os.RemoveAll(unitTestPath)
}
func TestGetBlockRange(t *testing.T) { func TestGetBlockRange(t *testing.T) {
testT = t testT = t
RawRequest = getblockStub RawRequest = getblockStub
os.RemoveAll(unitTestPath) os.RemoveAll(unitTestPath)
testcache := NewBlockCache(unitTestPath, unitTestChain, 380640, true) testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, true)
blockChan := make(chan *walletrpc.CompactBlock) blockChan := make(chan *walletrpc.CompactBlock)
errChan := make(chan error) errChan := make(chan error)
go GetBlockRange(testcache, blockChan, errChan, 380640, 380642) go GetBlockRange(testcache, blockChan, errChan, 380640, 380642)
@ -371,7 +567,7 @@ func TestGetBlockRangeReverse(t *testing.T) {
testT = t testT = t
RawRequest = getblockStubReverse RawRequest = getblockStubReverse
os.RemoveAll(unitTestPath) os.RemoveAll(unitTestPath)
testcache := NewBlockCache(unitTestPath, unitTestChain, 380640, true) testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, true)
blockChan := make(chan *walletrpc.CompactBlock) blockChan := make(chan *walletrpc.CompactBlock)
errChan := make(chan error) errChan := make(chan error)

View File

@ -77,7 +77,7 @@ func DarksideInit(c *BlockCache, timeout int) {
// DarksideReset allows the wallet test code to specify values // DarksideReset allows the wallet test code to specify values
// that are returned by GetLightdInfo(). // that are returned by GetLightdInfo().
func DarksideReset(sa int, bi, cn string) error { func DarksideReset(sa int, bi, cn string) error {
Log.Info("Reset(saplingActivation=", sa, ")") Log.Info("DarksideReset(saplingActivation=", sa, ")")
stopIngestor() stopIngestor()
state = darksideState{ state = darksideState{
resetted: true, resetted: true,
@ -138,7 +138,7 @@ func setPrevhash() {
copy(blockBytes[4:4+32], prevhash) copy(blockBytes[4:4+32], prevhash)
} }
prevhash = block.GetEncodableHash() prevhash = block.GetEncodableHash()
Log.Info("active block height ", block.GetHeight(), " hash ", Log.Info("Darkside active block height ", block.GetHeight(), " hash ",
hex.EncodeToString(block.GetDisplayHash()), hex.EncodeToString(block.GetDisplayHash()),
" txcount ", block.GetTxCount()) " txcount ", block.GetTxCount())
} }
@ -153,7 +153,7 @@ func DarksideApplyStaged(height int) error {
if !state.resetted { if !state.resetted {
return errors.New("please call Reset first") return errors.New("please call Reset first")
} }
Log.Info("ApplyStaged(height=", height, ")") Log.Info("DarksideApplyStaged(height=", height, ")")
if height < state.startHeight { if height < state.startHeight {
return errors.New(fmt.Sprint("height ", height, return errors.New(fmt.Sprint("height ", height,
" is less than sapling activation height ", state.startHeight)) " is less than sapling activation height ", state.startHeight))
@ -212,9 +212,13 @@ func DarksideApplyStaged(height int) error {
block = append(block, tx.bytes...) block = append(block, tx.bytes...)
state.activeBlocks[tx.height-state.startHeight] = block state.activeBlocks[tx.height-state.startHeight] = block
} }
maxHeight := state.startHeight + len(state.activeBlocks) - 1
if height > maxHeight {
height = maxHeight
}
setPrevhash() setPrevhash()
state.latestHeight = height state.latestHeight = height
Log.Info("active blocks from ", state.startHeight, Log.Info("darkside: active blocks from ", state.startHeight,
" to ", state.startHeight+len(state.activeBlocks)-1, " to ", state.startHeight+len(state.activeBlocks)-1,
", latest presented height ", state.latestHeight) ", latest presented height ", state.latestHeight)
@ -244,7 +248,7 @@ func darksideStageBlock(caller string, b []byte) error {
if len(rest) != 0 { if len(rest) != 0 {
return errors.New("block serialization is too long") return errors.New("block serialization is too long")
} }
Log.Info(caller, "(height=", block.GetHeight(), ")") Log.Info(caller, "DarksideStageBlock(height=", block.GetHeight(), ")")
if block.GetHeight() < state.startHeight { if block.GetHeight() < state.startHeight {
return errors.New(fmt.Sprint("block height ", block.GetHeight(), return errors.New(fmt.Sprint("block height ", block.GetHeight(),
" is less than sapling activation height ", state.startHeight)) " is less than sapling activation height ", state.startHeight))
@ -259,7 +263,7 @@ func DarksideStageBlocks(url string) error {
if !state.resetted { if !state.resetted {
return errors.New("please call Reset first") return errors.New("please call Reset first")
} }
Log.Info("StageBlocks(url=", url, ")") Log.Info("DarksideStageBlocks(url=", url, ")")
resp, err := http.Get(url) resp, err := http.Get(url)
if err != nil { if err != nil {
return err return err
@ -292,7 +296,7 @@ func DarksideStageBlockStream(blockHex string) error {
if !state.resetted { if !state.resetted {
return errors.New("please call Reset first") return errors.New("please call Reset first")
} }
Log.Info("StageBlocksStream()") Log.Info("DarksideStageBlocksStream()")
blockBytes, err := hex.DecodeString(blockHex) blockBytes, err := hex.DecodeString(blockHex)
if err != nil { if err != nil {
return err return err
@ -308,7 +312,7 @@ func DarksideStageBlocksCreate(height int32, nonce int32, count int32) error {
if !state.resetted { if !state.resetted {
return errors.New("please call Reset first") return errors.New("please call Reset first")
} }
Log.Info("StageBlocksCreate(height=", height, ", nonce=", nonce, ", count=", count, ")") Log.Info("DarksideStageBlocksCreate(height=", height, ", nonce=", nonce, ", count=", count, ")")
for i := 0; i < int(count); i++ { for i := 0; i < int(count); i++ {
fakeCoinbase := "0400008085202f890100000000000000000000000000000000000000000000000000" + fakeCoinbase := "0400008085202f890100000000000000000000000000000000000000000000000000" +
@ -413,6 +417,18 @@ func darksideRawRequest(method string, params []json.RawMessage) (json.RawMessag
} }
return json.Marshal(hex.EncodeToString(state.activeBlocks[index])) return json.Marshal(hex.EncodeToString(state.activeBlocks[index]))
case "getbestblockhash":
state.mutex.RLock()
defer state.mutex.RUnlock()
if len(state.activeBlocks) == 0 {
Log.Fatal("getbestblockhash: no blocks")
}
index := state.latestHeight - state.startHeight
block := parser.NewBlock()
block.ParseFromSlice(state.activeBlocks[index])
hash := hex.EncodeToString(block.GetDisplayHash())
return json.Marshal(hash)
case "getaddresstxids": case "getaddresstxids":
// Not required for minimal reorg testing. // Not required for minimal reorg testing.
return nil, errors.New("not implemented yet") return nil, errors.New("not implemented yet")
@ -577,7 +593,7 @@ func DarksideStageTransactionsURL(height int, url string) error {
if !state.resetted { if !state.resetted {
return errors.New("please call Reset first") return errors.New("please call Reset first")
} }
Log.Info("StageTransactionsURL(height=", height, ", url=", url, ")") Log.Info("DarksideStageTransactionsURL(height=", height, ", url=", url, ")")
resp, err := http.Get(url) resp, err := http.Get(url)
if err != nil { if err != nil {
return err return err

View File

@ -94,6 +94,16 @@ block height to another. This happens in two parts, first we create and apply
the "before reorg" state. Then we create the "after reorg" stage and apply the "before reorg" state. Then we create the "after reorg" stage and apply
it, which makes the reorg happen. it, which makes the reorg happen.
Here's a quick-start guide to simulating a reorg:
```
grpcurl -plaintext -d '{"saplingActivation": 663150,"branchID": "bad", "chainName":"x"}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/Reset
grpcurl -plaintext -d '{"url": "https://raw.githubusercontent.com/zcash-hackworks/darksidewalletd-test-data/master/basic-reorg/663150.txt"}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/StageBlocks
grpcurl -plaintext -d '{"height":663151,"count":10}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/StageBlocksCreate
grpcurl -plaintext -d '{"height":663160}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/ApplyStaged
grpcurl -plaintext -d '{"height":663155,"count":10,"nonce":44}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/StageBlocksCreate
grpcurl -plaintext -d '{"height":663164}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/ApplyStaged
```
#### Creating the Before-Reorg State #### Creating the Before-Reorg State
If you haven't already started darksidewalletd, please start it: If you haven't already started darksidewalletd, please start it: