Monitor for 1-block reorgs as well

This commit is contained in:
Aditya Kulkarni 2021-07-26 10:33:28 -07:00
parent d461c18785
commit 7f81fcbf78
2 changed files with 50 additions and 9 deletions

View File

@ -217,6 +217,24 @@ func GetLightdInfo() (*walletrpc.LightdInfo, error) {
}, nil }, nil
} }
func getBestBlockHash() ([]byte, error) {
result, rpcErr := RawRequest("getbestblockhash", []json.RawMessage{})
if rpcErr != nil {
return nil, rpcErr
}
var hash string
err := json.Unmarshal(result, &hash)
if err != nil {
return nil, err
}
hashbytes, err := hex.DecodeString(hash)
if err != nil {
return nil, err
}
return parser.Reverse(hashbytes), nil
}
func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) { func getBlockFromRPC(height int) (*walletrpc.CompactBlock, error) {
params := make([]json.RawMessage, 2) params := make([]json.RawMessage, 2)
heightJSON, err := json.Marshal(strconv.Itoa(height)) heightJSON, err := json.Marshal(strconv.Itoa(height))
@ -328,6 +346,28 @@ func BlockIngestor(c *BlockCache, rep int) {
Sleep(20 * time.Second) Sleep(20 * time.Second)
continue continue
} }
// Check the current top block to see if there's a hash mismatch (i.e., a 1-block reorg)
curhash, err := getBestBlockHash()
if err != nil {
Log.WithFields(logrus.Fields{
"height": height,
"error": err,
}).Warn("error zcashd getblock rpc")
continue
}
if c.HashMismatch(curhash) {
// Current block has a hash mismatch
Log.WithFields(logrus.Fields{
"height": height - 1,
"hash": displayHash(curhash),
"phash": displayHash(c.GetLatestHash()),
"reorg": reorgCount,
}).Warn("REORG/Current Block")
c.Reorg(height - 1)
continue
}
if wait { if wait {
// Wait a bit then retry the same height. // Wait a bit then retry the same height.
c.Sync() c.Sync()
@ -376,7 +416,7 @@ func BlockIngestor(c *BlockCache, rep int) {
Log.Fatal("Cache add failed:", err) Log.Fatal("Cache add failed:", err)
} }
// Don't log these too often. // Don't log these too often.
if time.Now().Sub(lastLog).Seconds() >= 4 && c.GetNextHeight() == height+1 && height != lastHeightLogged { if time.Since(lastLog).Seconds() >= 4 && c.GetNextHeight() == height+1 && height != lastHeightLogged {
lastLog = time.Now() lastLog = time.Now()
lastHeightLogged = height lastHeightLogged = height
Log.Info("Ingestor adding block to cache: ", height) Log.Info("Ingestor adding block to cache: ", height)

View File

@ -1,6 +1,7 @@
package common package common
import ( import (
"bytes"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"sync" "sync"
@ -17,8 +18,8 @@ var (
// List of all clients waiting to recieve mempool txns // List of all clients waiting to recieve mempool txns
clients []chan<- *walletrpc.RawTransaction clients []chan<- *walletrpc.RawTransaction
// Last height of the blocks. If this changes, then close all the clients and flush the mempool // Latest hash of the blocks. If this changes, then close all the clients and flush the mempool
lastHeight int lastHash []byte
// A pointer to the blockcache // A pointer to the blockcache
blockcache *BlockCache blockcache *BlockCache
@ -52,7 +53,7 @@ func AddNewClient(client chan<- *walletrpc.RawTransaction) {
// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients // RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients
func refreshMempoolTxns() error { func refreshMempoolTxns() error {
Log.Infoln("Refreshing mempool") //Log.Infoln("Refreshing mempool")
// First check if another refresh is running, if it is, just return // First check if another refresh is running, if it is, just return
if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) { if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) {
@ -70,8 +71,8 @@ func refreshMempoolTxns() error {
lock.Lock() lock.Lock()
defer lock.Unlock() defer lock.Unlock()
if lastHeight < blockcache.GetLatestHeight() { if !bytes.Equal(lastHash, blockcache.GetLatestHash()) {
Log.Infoln("Block height changed, clearing everything") Log.Infoln("Block hash changed, clearing mempool clients")
// Flush all the clients // Flush all the clients
for _, client := range clients { for _, client := range clients {
@ -85,7 +86,7 @@ func refreshMempoolTxns() error {
// Clear txns // Clear txns
txns = make(map[string]*walletrpc.RawTransaction) txns = make(map[string]*walletrpc.RawTransaction)
lastHeight = blockcache.GetLatestHeight() lastHash = blockcache.GetLatestHash()
} }
var mempoolList []string var mempoolList []string
@ -131,7 +132,7 @@ func refreshMempoolTxns() error {
newRtx := &walletrpc.RawTransaction{ newRtx := &walletrpc.RawTransaction{
Data: txBytes, Data: txBytes,
Height: uint64(lastHeight), Height: uint64(blockcache.GetLatestHeight()),
} }
// Notify waiting clients // Notify waiting clients
@ -154,7 +155,7 @@ func StartMempoolMonitor(cache *BlockCache, done <-chan bool) {
go func() { go func() {
ticker := time.NewTicker(2 * time.Second) ticker := time.NewTicker(2 * time.Second)
blockcache = cache blockcache = cache
lastHeight = blockcache.GetLatestHeight() lastHash = blockcache.GetLatestHash()
for { for {
select { select {