diff --git a/common/cache.go b/common/cache.go index 9cc7d5e..8b5ffb8 100644 --- a/common/cache.go +++ b/common/cache.go @@ -51,12 +51,12 @@ func (c *BlockCache) GetLatestHash() []byte { return c.latestHash } -// HashMismatch indicates if the given prev-hash doesn't match the most recent block's hash +// HashMatch indicates if the given prev-hash matches the most recent block's hash // so reorgs can be detected. -func (c *BlockCache) HashMismatch(prevhash []byte) bool { +func (c *BlockCache) HashMatch(prevhash []byte) bool { c.mutex.RLock() defer c.mutex.RUnlock() - return c.latestHash != nil && !bytes.Equal(c.latestHash, prevhash) + return c.latestHash == nil || bytes.Equal(c.latestHash, prevhash) } // Make the block at the given height the lowest height that we don't have. diff --git a/common/common.go b/common/common.go index 2fe8323..a3a1e22 100644 --- a/common/common.go +++ b/common/common.go @@ -226,7 +226,7 @@ func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) { params := make([]json.RawMessage, 2) heightJSON, err := json.Marshal(strconv.Itoa(height)) if err != nil { - return nil, errors.Wrap(err, "error marshaling height") + Log.Fatal("getBlockFromRPC bad height argument", height, err) } params[0] = heightJSON params[1] = json.RawMessage("0") // non-verbose (raw hex) @@ -289,11 +289,8 @@ func stopIngestor() { // BlockIngestor runs as a goroutine and polls zcashd for new blocks, adding them // to the cache. The repetition count, rep, is nonzero only for unit-testing. func BlockIngestor(c *BlockCache, rep int) { - lastLog := time.Now() - reorgCount := 0 + lastLog := Time.Now() lastHeightLogged := 0 - retryCount := 0 - wait := true // Start listening for new blocks for i := 0; rep == 0 || i < rep; i++ { @@ -304,88 +301,60 @@ func BlockIngestor(c *BlockCache, rep int) { default: } - height := c.GetNextHeight() - block, err := getBlockFromRPC(height) + result, err := RawRequest("getbestblockhash", []json.RawMessage{}) if err != nil { Log.WithFields(logrus.Fields{ - "height": height, - "error": err, - }).Warn("error zcashd getblock rpc") - retryCount++ - if retryCount > 10 { - Log.WithFields(logrus.Fields{ - "timeouts": retryCount, - }).Fatal("unable to issue RPC call to zcashd node") - } - // Delay then retry the same height. + "error": err, + }).Fatal("error zcashd getbestblockhash rpc") + } + var hashHex string + err = json.Unmarshal(result, &hashHex) + if err != nil { + Log.Fatal("bad getbestblockhash return:", err, result) + } + lastBestBlockHash := []byte{} + lastBestBlockHash, err = hex.DecodeString(hashHex) + if err != nil { + Log.Fatal("error decoding getbestblockhash", err, hashHex) + } + + height := c.GetNextHeight() + if string(lastBestBlockHash) == string(parser.Reverse(c.GetLatestHash())) { + // Synced c.Sync() - Time.Sleep(10 * time.Second) - wait = true + if lastHeightLogged != height-1 { + lastHeightLogged = height - 1 + Log.Info("Waiting for block: ", height) + } + Time.Sleep(2 * time.Second) + lastLog = Time.Now() continue } - retryCount = 0 - if block == nil { - // No block at this height. - if height == c.GetFirstHeight() { - Log.Info("Waiting for zcashd height to reach Sapling activation height ", - "(", c.GetFirstHeight(), ")...") - reorgCount = 0 - Time.Sleep(20 * time.Second) - continue - } - if wait { - // Wait a bit then retry the same height. - c.Sync() - if lastHeightLogged+1 != height { - Log.Info("Ingestor waiting for block: ", height) - lastHeightLogged = height - 1 - } - Time.Sleep(2 * time.Second) - wait = false - continue - } + var block *walletrpc.CompactBlock + block, err = getBlockFromRPC(height) + if err != nil { + Log.Fatal("getblock failed, will retry", err) } - if block == nil || c.HashMismatch(block.PrevHash) { - // This may not be a reorg; it may be we're at the tip - // and there's no new block yet, but we want to back up - // so we detect a reorg in which the new chain is the - // same length or shorter. - reorgCount++ - if reorgCount > 100 { - Log.Fatal("Reorg exceeded max of 100 blocks! Help!") + if block != nil && c.HashMatch(block.PrevHash) { + if err = c.Add(height, block); err != nil { + Log.Fatal("Cache add failed:", err) } - // Print the hash of the block that is getting reorg-ed away - // as 'phash', not the prevhash of the block we just received. - if block != nil { - Log.WithFields(logrus.Fields{ - "height": height, - "hash": displayHash(block.Hash), - "phash": displayHash(c.GetLatestHash()), - "reorg": reorgCount, - }).Warn("REORG") - } else if reorgCount > 1 { - Log.WithFields(logrus.Fields{ - "height": height, - "phash": displayHash(c.GetLatestHash()), - "reorg": reorgCount, - }).Warn("REORG") + // Don't log these too often. + if DarksideEnabled || Time.Now().Sub(lastLog).Seconds() >= 4 { + lastLog = Time.Now() + Log.Info("Adding block to cache ", height, " ", displayHash(block.Hash)) } - // Try backing up - c.Reorg(height - 1) continue } - // We have a valid block to add. - wait = true - reorgCount = 0 - if err := c.Add(height, block); err != nil { - Log.Fatal("Cache add failed:", err) - } - // Don't log these too often. - if time.Now().Sub(lastLog).Seconds() >= 4 && c.GetNextHeight() == height+1 && height != lastHeightLogged { - lastLog = time.Now() - lastHeightLogged = height - Log.Info("Ingestor adding block to cache: ", height) + if height == c.GetFirstHeight() { + c.Sync() + Log.Info("Waiting for zcashd height to reach Sapling activation height ", + "(", c.GetFirstHeight(), ")...") + Time.Sleep(20 * time.Second) + return } + Log.Info("REORG: dropping block ", height-1, " ", displayHash(c.GetLatestHash())) + c.Reorg(height - 1) } } diff --git a/common/common_test.go b/common/common_test.go index 5006806..af2b3aa 100644 --- a/common/common_test.go +++ b/common/common_test.go @@ -34,6 +34,8 @@ var ( logger = logrus.New() blocks [][]byte // four test blocks + + testcache *BlockCache ) // TestMain does common setup that's shared across multiple tests @@ -60,6 +62,7 @@ func TestMain(m *testing.M) { blockJSON, _ := json.Marshal(scan.Text()) blocks = append(blocks, blockJSON) } + testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, true) // Setup is done; run all tests. exitcode := m.Run() @@ -160,8 +163,217 @@ func TestGetLightdInfo(t *testing.T) { // ------------------------------------------ BlockIngestor() +func checkSleepMethod(count int, duration time.Duration, expected string, method string) { + if sleepCount != count { + testT.Fatal("unexpected sleep count") + } + if sleepDuration != duration*time.Second { + testT.Fatal("unexpected sleep duration") + } + if method != expected { + testT.Error("unexpected method") + } +} + // There are four test blocks, 0..3 +func blockIngestorStub(method string, params []json.RawMessage) (json.RawMessage, error) { + step++ + // request the first two blocks very quickly (syncing), + // then next block isn't yet available + switch step { + case 1: + checkSleepMethod(0, 0, "getbestblockhash", method) + // This hash doesn't matter, won't match anything + r, _ := json.Marshal("010101") + return r, nil + case 2: + checkSleepMethod(0, 0, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380640" { + testT.Fatal("incorrect height requested") + } + // height 380640 + return blocks[0], nil + case 3: + checkSleepMethod(0, 0, "getbestblockhash", method) + // This hash doesn't matter, won't match anything + r, _ := json.Marshal("010101") + return r, nil + case 4: + checkSleepMethod(0, 0, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380641" { + testT.Fatal("incorrect height requested") + } + // height 380641 + return blocks[1], nil + case 5: + // Return the expected block hash, so we're synced, should + // then sleep for 2 seconds, then another getbestblockhash + checkSleepMethod(0, 0, "getbestblockhash", method) + r, _ := json.Marshal(displayHash(testcache.GetLatestHash())) + return r, nil + case 6: + // Simulate still no new block, still synced, should + // sleep for 2 seconds, then another getbestblockhash + checkSleepMethod(1, 2, "getbestblockhash", method) + r, _ := json.Marshal(displayHash(testcache.GetLatestHash())) + return r, nil + case 7: + // Simulate new block (any non-matching hash will do) + checkSleepMethod(2, 4, "getbestblockhash", method) + r, _ := json.Marshal("aabb") + return r, nil + case 8: + checkSleepMethod(2, 4, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380642" { + testT.Fatal("incorrect height requested") + } + // height 380642 + return blocks[2], nil + case 9: + // Simulate still no new block, still synced, should + // sleep for 2 seconds, then another getbestblockhash + checkSleepMethod(2, 4, "getbestblockhash", method) + r, _ := json.Marshal(displayHash(testcache.GetLatestHash())) + return r, nil + case 10: + // There are 3 blocks in the cache (380640-642), so let's + // simulate a 1-block reorg, new version (replacement) of 380642 + checkSleepMethod(3, 6, "getbestblockhash", method) + // hash doesn't matter, just something that doesn't match + r, _ := json.Marshal("4545") + return r, nil + case 11: + // It thinks there may simply be a new block, but we'll say + // there is no block at this height (380642 was replaced). + checkSleepMethod(3, 6, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380643" { + testT.Fatal("incorrect height requested") + } + return nil, errors.New("-8: Block height out of range") + case 12: + // It will re-ask the best hash (let's make no change) + checkSleepMethod(3, 6, "getbestblockhash", method) + // hash doesn't matter, just something that doesn't match + r, _ := json.Marshal("4545") + return r, nil + case 13: + // It should have backed up one block + checkSleepMethod(3, 6, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380642" { + testT.Fatal("incorrect height requested") + } + // height 380642 + return blocks[2], nil + case 14: + // We're back to the same state as case 9, and this time + // we'll make it back up 2 blocks (rather than one) + checkSleepMethod(3, 6, "getbestblockhash", method) // XXXXXXXXXXXXXXXXXXXXXXXXXXXXX XXX + // hash doesn't matter, just something that doesn't match + r, _ := json.Marshal("5656") + return r, nil + case 15: + // It thinks there may simply be a new block, but we'll say + // there is no block at this height (380642 was replaced). + checkSleepMethod(3, 6, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380643" { + testT.Fatal("incorrect height requested") + } + return nil, errors.New("-8: Block height out of range") + case 16: + checkSleepMethod(3, 6, "getbestblockhash", method) + // hash doesn't matter, just something that doesn't match + r, _ := json.Marshal("5656") + return r, nil + case 17: + // Like case 13, it should have backed up one block, but + // this time we'll make it back up one more + checkSleepMethod(3, 6, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380642" { + testT.Fatal("incorrect height requested") + } + return nil, errors.New("-8: Block height out of range") + case 18: + checkSleepMethod(3, 6, "getbestblockhash", method) + // hash doesn't matter, just something that doesn't match + r, _ := json.Marshal("5656") + return r, nil + case 19: + // It should have backed up one more + checkSleepMethod(3, 6, "getblock", method) + var height string + err := json.Unmarshal(params[0], &height) + if err != nil { + testT.Fatal("could not unmarshal height") + } + if height != "380641" { + testT.Fatal("incorrect height requested") + } + return blocks[1], nil + } + testT.Error("blockIngestorStub called too many times") + return nil, nil +} + +func TestBlockIngestor(t *testing.T) { + testT = t + RawRequest = blockIngestorStub + Time.Sleep = sleepStub + Time.Now = nowStub + os.RemoveAll(unitTestPath) + testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, false) + BlockIngestor(testcache, 11) + if step != 19 { + t.Error("unexpected final step", step) + } + step = 0 + sleepCount = 0 + sleepDuration = 0 + os.RemoveAll(unitTestPath) +} + +// ------------------------------------------ GetBlockRange() + +// There are four test blocks, 0..3 +// (probably don't need all these cases) func getblockStub(method string, params []json.RawMessage) (json.RawMessage, error) { + if method != "getblock" { + testT.Error("unexpected method") + } var height string err := json.Unmarshal(params[0], &height) if err != nil { @@ -272,27 +484,11 @@ func getblockStub(method string, params []json.RawMessage) (json.RawMessage, err return nil, nil } -func TestBlockIngestor(t *testing.T) { - testT = t - RawRequest = getblockStub - Time.Sleep = sleepStub - os.RemoveAll(unitTestPath) - testcache := NewBlockCache(unitTestPath, unitTestChain, 380640, false) - BlockIngestor(testcache, 11) - if step != 11 { - t.Error("unexpected final step", step) - } - step = 0 - sleepCount = 0 - sleepDuration = 0 - os.RemoveAll(unitTestPath) -} - func TestGetBlockRange(t *testing.T) { testT = t RawRequest = getblockStub os.RemoveAll(unitTestPath) - testcache := NewBlockCache(unitTestPath, unitTestChain, 380640, true) + testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, true) blockChan := make(chan *walletrpc.CompactBlock) errChan := make(chan error) go GetBlockRange(testcache, blockChan, errChan, 380640, 380642) @@ -371,7 +567,7 @@ func TestGetBlockRangeReverse(t *testing.T) { testT = t RawRequest = getblockStubReverse os.RemoveAll(unitTestPath) - testcache := NewBlockCache(unitTestPath, unitTestChain, 380640, true) + testcache = NewBlockCache(unitTestPath, unitTestChain, 380640, true) blockChan := make(chan *walletrpc.CompactBlock) errChan := make(chan error) diff --git a/common/darkside.go b/common/darkside.go index 535d9ae..70608d3 100644 --- a/common/darkside.go +++ b/common/darkside.go @@ -77,7 +77,7 @@ func DarksideInit(c *BlockCache, timeout int) { // DarksideReset allows the wallet test code to specify values // that are returned by GetLightdInfo(). func DarksideReset(sa int, bi, cn string) error { - Log.Info("Reset(saplingActivation=", sa, ")") + Log.Info("DarksideReset(saplingActivation=", sa, ")") stopIngestor() state = darksideState{ resetted: true, @@ -138,7 +138,7 @@ func setPrevhash() { copy(blockBytes[4:4+32], prevhash) } prevhash = block.GetEncodableHash() - Log.Info("active block height ", block.GetHeight(), " hash ", + Log.Info("Darkside active block height ", block.GetHeight(), " hash ", hex.EncodeToString(block.GetDisplayHash()), " txcount ", block.GetTxCount()) } @@ -153,7 +153,7 @@ func DarksideApplyStaged(height int) error { if !state.resetted { return errors.New("please call Reset first") } - Log.Info("ApplyStaged(height=", height, ")") + Log.Info("DarksideApplyStaged(height=", height, ")") if height < state.startHeight { return errors.New(fmt.Sprint("height ", height, " is less than sapling activation height ", state.startHeight)) @@ -212,9 +212,13 @@ func DarksideApplyStaged(height int) error { block = append(block, tx.bytes...) state.activeBlocks[tx.height-state.startHeight] = block } + maxHeight := state.startHeight + len(state.activeBlocks) - 1 + if height > maxHeight { + height = maxHeight + } setPrevhash() state.latestHeight = height - Log.Info("active blocks from ", state.startHeight, + Log.Info("darkside: active blocks from ", state.startHeight, " to ", state.startHeight+len(state.activeBlocks)-1, ", latest presented height ", state.latestHeight) @@ -244,7 +248,7 @@ func darksideStageBlock(caller string, b []byte) error { if len(rest) != 0 { return errors.New("block serialization is too long") } - Log.Info(caller, "(height=", block.GetHeight(), ")") + Log.Info(caller, "DarksideStageBlock(height=", block.GetHeight(), ")") if block.GetHeight() < state.startHeight { return errors.New(fmt.Sprint("block height ", block.GetHeight(), " is less than sapling activation height ", state.startHeight)) @@ -259,7 +263,7 @@ func DarksideStageBlocks(url string) error { if !state.resetted { return errors.New("please call Reset first") } - Log.Info("StageBlocks(url=", url, ")") + Log.Info("DarksideStageBlocks(url=", url, ")") resp, err := http.Get(url) if err != nil { return err @@ -292,7 +296,7 @@ func DarksideStageBlockStream(blockHex string) error { if !state.resetted { return errors.New("please call Reset first") } - Log.Info("StageBlocksStream()") + Log.Info("DarksideStageBlocksStream()") blockBytes, err := hex.DecodeString(blockHex) if err != nil { return err @@ -308,7 +312,7 @@ func DarksideStageBlocksCreate(height int32, nonce int32, count int32) error { if !state.resetted { return errors.New("please call Reset first") } - Log.Info("StageBlocksCreate(height=", height, ", nonce=", nonce, ", count=", count, ")") + Log.Info("DarksideStageBlocksCreate(height=", height, ", nonce=", nonce, ", count=", count, ")") for i := 0; i < int(count); i++ { fakeCoinbase := "0400008085202f890100000000000000000000000000000000000000000000000000" + @@ -413,6 +417,18 @@ func darksideRawRequest(method string, params []json.RawMessage) (json.RawMessag } return json.Marshal(hex.EncodeToString(state.activeBlocks[index])) + case "getbestblockhash": + state.mutex.RLock() + defer state.mutex.RUnlock() + if len(state.activeBlocks) == 0 { + Log.Fatal("getbestblockhash: no blocks") + } + index := state.latestHeight - state.startHeight + block := parser.NewBlock() + block.ParseFromSlice(state.activeBlocks[index]) + hash := hex.EncodeToString(block.GetDisplayHash()) + return json.Marshal(hash) + case "getaddresstxids": // Not required for minimal reorg testing. return nil, errors.New("not implemented yet") @@ -577,7 +593,7 @@ func DarksideStageTransactionsURL(height int, url string) error { if !state.resetted { return errors.New("please call Reset first") } - Log.Info("StageTransactionsURL(height=", height, ", url=", url, ")") + Log.Info("DarksideStageTransactionsURL(height=", height, ", url=", url, ")") resp, err := http.Get(url) if err != nil { return err diff --git a/docs/darksidewalletd.md b/docs/darksidewalletd.md index ec44ae2..afccdfb 100644 --- a/docs/darksidewalletd.md +++ b/docs/darksidewalletd.md @@ -94,6 +94,16 @@ block height to another. This happens in two parts, first we create and apply the "before reorg" state. Then we create the "after reorg" stage and apply it, which makes the reorg happen. +Here's a quick-start guide to simulating a reorg: +``` +grpcurl -plaintext -d '{"saplingActivation": 663150,"branchID": "bad", "chainName":"x"}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/Reset +grpcurl -plaintext -d '{"url": "https://raw.githubusercontent.com/zcash-hackworks/darksidewalletd-test-data/master/basic-reorg/663150.txt"}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/StageBlocks +grpcurl -plaintext -d '{"height":663151,"count":10}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/StageBlocksCreate +grpcurl -plaintext -d '{"height":663160}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/ApplyStaged +grpcurl -plaintext -d '{"height":663155,"count":10,"nonce":44}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/StageBlocksCreate +grpcurl -plaintext -d '{"height":663164}' localhost:9067 cash.z.wallet.sdk.rpc.DarksideStreamer/ApplyStaged +``` + #### Creating the Before-Reorg State If you haven't already started darksidewalletd, please start it: