From 7f81fcbf78406ef677ba9e668850a80970b9ec4b Mon Sep 17 00:00:00 2001 From: Aditya Kulkarni Date: Mon, 26 Jul 2021 10:33:28 -0700 Subject: [PATCH] Monitor for 1-block reorgs as well --- common/common.go | 42 +++++++++++++++++++++++++++++++++++++++++- common/mempool.go | 17 +++++++++-------- 2 files changed, 50 insertions(+), 9 deletions(-) diff --git a/common/common.go b/common/common.go index b0889ad..9b6cc81 100644 --- a/common/common.go +++ b/common/common.go @@ -217,6 +217,24 @@ func GetLightdInfo() (*walletrpc.LightdInfo, error) { }, nil } +func getBestBlockHash() ([]byte, error) { + result, rpcErr := RawRequest("getbestblockhash", []json.RawMessage{}) + if rpcErr != nil { + return nil, rpcErr + } + var hash string + err := json.Unmarshal(result, &hash) + if err != nil { + return nil, err + } + hashbytes, err := hex.DecodeString(hash) + if err != nil { + return nil, err + } + + return parser.Reverse(hashbytes), nil +} + func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) { params := make([]json.RawMessage, 2) heightJSON, err := json.Marshal(strconv.Itoa(height)) @@ -328,6 +346,28 @@ func BlockIngestor(c *BlockCache, rep int) { Sleep(20 * time.Second) continue } + + // Check the current top block to see if there's a hash mismatch (i.e., a 1-block reorg) + curhash, err := getBestBlockHash() + if err != nil { + Log.WithFields(logrus.Fields{ + "height": height, + "error": err, + }).Warn("error zcashd getblock rpc") + continue + } + if c.HashMismatch(curhash) { + // Current block has a hash mismatch + Log.WithFields(logrus.Fields{ + "height": height - 1, + "hash": displayHash(curhash), + "phash": displayHash(c.GetLatestHash()), + "reorg": reorgCount, + }).Warn("REORG/Current Block") + c.Reorg(height - 1) + continue + } + if wait { // Wait a bit then retry the same height. c.Sync() @@ -376,7 +416,7 @@ func BlockIngestor(c *BlockCache, rep int) { Log.Fatal("Cache add failed:", err) } // Don't log these too often. - if time.Now().Sub(lastLog).Seconds() >= 4 && c.GetNextHeight() == height+1 && height != lastHeightLogged { + if time.Since(lastLog).Seconds() >= 4 && c.GetNextHeight() == height+1 && height != lastHeightLogged { lastLog = time.Now() lastHeightLogged = height Log.Info("Ingestor adding block to cache: ", height) diff --git a/common/mempool.go b/common/mempool.go index 9c19473..f498b48 100644 --- a/common/mempool.go +++ b/common/mempool.go @@ -1,6 +1,7 @@ package common import ( + "bytes" "encoding/hex" "encoding/json" "sync" @@ -17,8 +18,8 @@ var ( // List of all clients waiting to recieve mempool txns clients []chan<- *walletrpc.RawTransaction - // Last height of the blocks. If this changes, then close all the clients and flush the mempool - lastHeight int + // Latest hash of the blocks. If this changes, then close all the clients and flush the mempool + lastHash []byte // A pointer to the blockcache blockcache *BlockCache @@ -52,7 +53,7 @@ func AddNewClient(client chan<- *walletrpc.RawTransaction) { // RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients func refreshMempoolTxns() error { - Log.Infoln("Refreshing mempool") + //Log.Infoln("Refreshing mempool") // First check if another refresh is running, if it is, just return if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) { @@ -70,8 +71,8 @@ func refreshMempoolTxns() error { lock.Lock() defer lock.Unlock() - if lastHeight < blockcache.GetLatestHeight() { - Log.Infoln("Block height changed, clearing everything") + if !bytes.Equal(lastHash, blockcache.GetLatestHash()) { + Log.Infoln("Block hash changed, clearing mempool clients") // Flush all the clients for _, client := range clients { @@ -85,7 +86,7 @@ func refreshMempoolTxns() error { // Clear txns txns = make(map[string]*walletrpc.RawTransaction) - lastHeight = blockcache.GetLatestHeight() + lastHash = blockcache.GetLatestHash() } var mempoolList []string @@ -131,7 +132,7 @@ func refreshMempoolTxns() error { newRtx := &walletrpc.RawTransaction{ Data: txBytes, - Height: uint64(lastHeight), + Height: uint64(blockcache.GetLatestHeight()), } // Notify waiting clients @@ -154,7 +155,7 @@ func StartMempoolMonitor(cache *BlockCache, done <-chan bool) { go func() { ticker := time.NewTicker(2 * time.Second) blockcache = cache - lastHeight = blockcache.GetLatestHeight() + lastHash = blockcache.GetLatestHash() for { select {