From c3e1b98d4f031fb45f9bbe37cde604895ca39ff7 Mon Sep 17 00:00:00 2001 From: Aditya Kulkarni Date: Wed, 14 Jul 2021 14:48:57 -0700 Subject: [PATCH] Mempool Streaming API --- cmd/root.go | 5 ++ common/mempool.go | 177 ++++++++++++++++++++++++++++++++++++++++ frontend/service.go | 21 +++++ walletrpc/service.proto | 4 + 4 files changed, 207 insertions(+) create mode 100644 common/mempool.go diff --git a/cmd/root.go b/cmd/root.go index 48ab97c..3a61e4a 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -267,6 +267,10 @@ func startServer(opts *common.Options) error { walletrpc.RegisterDarksideStreamerServer(server, service) } + // Initialize mempool monitor + exitMempool := make(chan bool) + common.StartMempoolMonitor(cache, exitMempool) + // Start listening listener, err := net.Listen("tcp", opts.GRPCBindAddr) if err != nil { @@ -282,6 +286,7 @@ func startServer(opts *common.Options) error { go func() { s := <-signals cache.Sync() + exitMempool <- true common.Log.WithFields(logrus.Fields{ "signal": s.String(), }).Info("caught signal, stopping gRPC server") diff --git a/common/mempool.go b/common/mempool.go new file mode 100644 index 0000000..7cdccbf --- /dev/null +++ b/common/mempool.go @@ -0,0 +1,177 @@ +package common + +import ( + "encoding/hex" + "encoding/json" + "sync" + "sync/atomic" + "time" + + "github.com/zcash/lightwalletd/walletrpc" +) + +var ( + // List of all mempool transactions + txns map[string]*walletrpc.RawTransaction = make(map[string]*walletrpc.RawTransaction) + + // List of all clients waiting to recieve mempool txns + clients []chan<- *walletrpc.RawTransaction + + // Last height of the blocks. If this changes, then close all the clients and flush the mempool + lastHeight int + + // A pointer to the blockcache + blockcache *BlockCache + + // Mutex to lock the above 2 structs + lock sync.Mutex + + // Since the mutex doesn't have a "try_lock" method, we'll have to improvize with this + refreshing int32 = 0 +) + +// AddNewClient adds a new client to the list of clients to notify for mempool txns +func AddNewClient(client chan<- *walletrpc.RawTransaction) { + lock.Lock() + defer lock.Unlock() + + //Log.Infoln("Adding new client, sending ", len(txns), " transactions") + + // Also send all pending mempool txns + for _, rtx := range txns { + if client != nil { + client <- rtx + } + } + + if client != nil { + clients = append(clients, client) + } +} + +// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients +func refreshMempoolTxns() error { + Log.Infoln("Refreshing mempool") + + // First check if another refresh is running, if it is, just return + if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) { + Log.Warnln("Another refresh in progress, returning") + return nil + } + + // Set refreshing to 0 when we exit + defer func() { + refreshing = 0 + }() + + // Check if the blockchain has changed, and if it has, then clear everything + + lock.Lock() + defer lock.Unlock() + + if lastHeight < blockcache.GetLatestHeight() { + Log.Infoln("Block height changed, clearing everything") + + // Flush all the clients + for _, client := range clients { + if client != nil { + close(client) + } + } + + clients = make([]chan<- *walletrpc.RawTransaction, 0) + + // Clear txns + txns = make(map[string]*walletrpc.RawTransaction) + + lastHeight = blockcache.GetLatestHeight() + } + + var mempoolList []string + params := make([]json.RawMessage, 0) + result, rpcErr := RawRequest("getrawmempool", params) + if rpcErr != nil { + return rpcErr + } + err := json.Unmarshal(result, &mempoolList) + if err != nil { + return err + } + + //println("getrawmempool size ", len(mempoolList)) + + // Fetch all new mempool txns and add them into `newTxns` + for _, txidstr := range mempoolList { + if _, ok := txns[txidstr]; !ok { + txidJSON, err := json.Marshal(txidstr) + if err != nil { + return err + } + // The "0" is because we only need the raw hex, which is returned as + // just a hex string, and not even a json string (with quotes). + params := []json.RawMessage{txidJSON, json.RawMessage("0")} + result, rpcErr := RawRequest("getrawtransaction", params) + if rpcErr != nil { + // Not an error; mempool transactions can disappear + continue + } + // strip the quotes + var txStr string + err = json.Unmarshal(result, &txStr) + if err != nil { + return err + } + + // conver to binary + txBytes, err := hex.DecodeString(txStr) + if err != nil { + return err + } + + newRtx := &walletrpc.RawTransaction{ + Data: txBytes, + Height: uint64(lastHeight), + } + + // Notify waiting clients + for _, client := range clients { + if client != nil { + client <- newRtx + } + } + + Log.Infoln("Adding new mempool txid", txidstr, " sending to ", len(clients), " clients") + txns[txidstr] = newRtx + } + } + + return nil +} + +// StartMempoolMonitor starts monitoring the mempool +func StartMempoolMonitor(cache *BlockCache, done <-chan bool) { + go func() { + ticker := time.NewTicker(2 * time.Second) + blockcache = cache + lastHeight = blockcache.GetLatestHeight() + + for { + select { + case <-ticker.C: + go func() { + //Log.Infoln("Ticker triggered") + err := refreshMempoolTxns() + if err != nil { + Log.Errorln("Mempool refresh error:", err.Error()) + } + }() + + case <-done: + for _, client := range clients { + close(client) + } + return + } + } + }() +} diff --git a/frontend/service.go b/frontend/service.go index 8744172..a22c8de 100644 --- a/frontend/service.go +++ b/frontend/service.go @@ -380,6 +380,27 @@ func (s *lwdStreamer) GetTaddressBalanceStream(addresses walletrpc.CompactTxStre return nil } +func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.CompactTxStreamer_GetMempoolStreamServer) error { + ch := make(chan *walletrpc.RawTransaction, 200) + go common.AddNewClient(ch) + + for { + select { + case rtx, more := <-ch: + if !more || rtx == nil { + return nil + } + + if resp.Send(rtx) != nil { + return nil + } + // Timeout after 5 mins + case <-time.After(5 * time.Minute): + return nil + } + } +} + // Key is 32-byte txid (as a 64-character string), data is pointer to compact tx. var mempoolMap *map[string]*walletrpc.CompactTx var mempoolList []string diff --git a/walletrpc/service.proto b/walletrpc/service.proto index 8021e3f..e43ec66 100644 --- a/walletrpc/service.proto +++ b/walletrpc/service.proto @@ -164,6 +164,10 @@ service CompactTxStreamer { // in the exclude list that don't exist in the mempool are ignored. rpc GetMempoolTx(Exclude) returns (stream CompactTx) {} + // Return a stream of current Mempool transactions. This will keep the output stream open while + // there are mempool transactions. It will close the returned stream when a new block is mined. + rpc GetMempoolStream(Empty) returns (stream RawTransaction) {} + // GetTreeState returns the note commitment tree state corresponding to the given block. // See section 3.7 of the Zcash protocol specification. It returns several other useful // values also (even though they can be obtained using GetBlock).