Mempool Streaming API
This commit is contained in:
parent
3f669c3d19
commit
c3e1b98d4f
|
@ -267,6 +267,10 @@ func startServer(opts *common.Options) error {
|
|||
walletrpc.RegisterDarksideStreamerServer(server, service)
|
||||
}
|
||||
|
||||
// Initialize mempool monitor
|
||||
exitMempool := make(chan bool)
|
||||
common.StartMempoolMonitor(cache, exitMempool)
|
||||
|
||||
// Start listening
|
||||
listener, err := net.Listen("tcp", opts.GRPCBindAddr)
|
||||
if err != nil {
|
||||
|
@ -282,6 +286,7 @@ func startServer(opts *common.Options) error {
|
|||
go func() {
|
||||
s := <-signals
|
||||
cache.Sync()
|
||||
exitMempool <- true
|
||||
common.Log.WithFields(logrus.Fields{
|
||||
"signal": s.String(),
|
||||
}).Info("caught signal, stopping gRPC server")
|
||||
|
|
|
@ -0,0 +1,177 @@
|
|||
package common
|
||||
|
||||
import (
|
||||
"encoding/hex"
|
||||
"encoding/json"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/zcash/lightwalletd/walletrpc"
|
||||
)
|
||||
|
||||
var (
|
||||
// List of all mempool transactions
|
||||
txns map[string]*walletrpc.RawTransaction = make(map[string]*walletrpc.RawTransaction)
|
||||
|
||||
// List of all clients waiting to recieve mempool txns
|
||||
clients []chan<- *walletrpc.RawTransaction
|
||||
|
||||
// Last height of the blocks. If this changes, then close all the clients and flush the mempool
|
||||
lastHeight int
|
||||
|
||||
// A pointer to the blockcache
|
||||
blockcache *BlockCache
|
||||
|
||||
// Mutex to lock the above 2 structs
|
||||
lock sync.Mutex
|
||||
|
||||
// Since the mutex doesn't have a "try_lock" method, we'll have to improvize with this
|
||||
refreshing int32 = 0
|
||||
)
|
||||
|
||||
// AddNewClient adds a new client to the list of clients to notify for mempool txns
|
||||
func AddNewClient(client chan<- *walletrpc.RawTransaction) {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
//Log.Infoln("Adding new client, sending ", len(txns), " transactions")
|
||||
|
||||
// Also send all pending mempool txns
|
||||
for _, rtx := range txns {
|
||||
if client != nil {
|
||||
client <- rtx
|
||||
}
|
||||
}
|
||||
|
||||
if client != nil {
|
||||
clients = append(clients, client)
|
||||
}
|
||||
}
|
||||
|
||||
// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients
|
||||
func refreshMempoolTxns() error {
|
||||
Log.Infoln("Refreshing mempool")
|
||||
|
||||
// First check if another refresh is running, if it is, just return
|
||||
if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) {
|
||||
Log.Warnln("Another refresh in progress, returning")
|
||||
return nil
|
||||
}
|
||||
|
||||
// Set refreshing to 0 when we exit
|
||||
defer func() {
|
||||
refreshing = 0
|
||||
}()
|
||||
|
||||
// Check if the blockchain has changed, and if it has, then clear everything
|
||||
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
|
||||
if lastHeight < blockcache.GetLatestHeight() {
|
||||
Log.Infoln("Block height changed, clearing everything")
|
||||
|
||||
// Flush all the clients
|
||||
for _, client := range clients {
|
||||
if client != nil {
|
||||
close(client)
|
||||
}
|
||||
}
|
||||
|
||||
clients = make([]chan<- *walletrpc.RawTransaction, 0)
|
||||
|
||||
// Clear txns
|
||||
txns = make(map[string]*walletrpc.RawTransaction)
|
||||
|
||||
lastHeight = blockcache.GetLatestHeight()
|
||||
}
|
||||
|
||||
var mempoolList []string
|
||||
params := make([]json.RawMessage, 0)
|
||||
result, rpcErr := RawRequest("getrawmempool", params)
|
||||
if rpcErr != nil {
|
||||
return rpcErr
|
||||
}
|
||||
err := json.Unmarshal(result, &mempoolList)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
//println("getrawmempool size ", len(mempoolList))
|
||||
|
||||
// Fetch all new mempool txns and add them into `newTxns`
|
||||
for _, txidstr := range mempoolList {
|
||||
if _, ok := txns[txidstr]; !ok {
|
||||
txidJSON, err := json.Marshal(txidstr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
// The "0" is because we only need the raw hex, which is returned as
|
||||
// just a hex string, and not even a json string (with quotes).
|
||||
params := []json.RawMessage{txidJSON, json.RawMessage("0")}
|
||||
result, rpcErr := RawRequest("getrawtransaction", params)
|
||||
if rpcErr != nil {
|
||||
// Not an error; mempool transactions can disappear
|
||||
continue
|
||||
}
|
||||
// strip the quotes
|
||||
var txStr string
|
||||
err = json.Unmarshal(result, &txStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// conver to binary
|
||||
txBytes, err := hex.DecodeString(txStr)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
newRtx := &walletrpc.RawTransaction{
|
||||
Data: txBytes,
|
||||
Height: uint64(lastHeight),
|
||||
}
|
||||
|
||||
// Notify waiting clients
|
||||
for _, client := range clients {
|
||||
if client != nil {
|
||||
client <- newRtx
|
||||
}
|
||||
}
|
||||
|
||||
Log.Infoln("Adding new mempool txid", txidstr, " sending to ", len(clients), " clients")
|
||||
txns[txidstr] = newRtx
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// StartMempoolMonitor starts monitoring the mempool
|
||||
func StartMempoolMonitor(cache *BlockCache, done <-chan bool) {
|
||||
go func() {
|
||||
ticker := time.NewTicker(2 * time.Second)
|
||||
blockcache = cache
|
||||
lastHeight = blockcache.GetLatestHeight()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
go func() {
|
||||
//Log.Infoln("Ticker triggered")
|
||||
err := refreshMempoolTxns()
|
||||
if err != nil {
|
||||
Log.Errorln("Mempool refresh error:", err.Error())
|
||||
}
|
||||
}()
|
||||
|
||||
case <-done:
|
||||
for _, client := range clients {
|
||||
close(client)
|
||||
}
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
|
@ -380,6 +380,27 @@ func (s *lwdStreamer) GetTaddressBalanceStream(addresses walletrpc.CompactTxStre
|
|||
return nil
|
||||
}
|
||||
|
||||
func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.CompactTxStreamer_GetMempoolStreamServer) error {
|
||||
ch := make(chan *walletrpc.RawTransaction, 200)
|
||||
go common.AddNewClient(ch)
|
||||
|
||||
for {
|
||||
select {
|
||||
case rtx, more := <-ch:
|
||||
if !more || rtx == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
if resp.Send(rtx) != nil {
|
||||
return nil
|
||||
}
|
||||
// Timeout after 5 mins
|
||||
case <-time.After(5 * time.Minute):
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Key is 32-byte txid (as a 64-character string), data is pointer to compact tx.
|
||||
var mempoolMap *map[string]*walletrpc.CompactTx
|
||||
var mempoolList []string
|
||||
|
|
|
@ -164,6 +164,10 @@ service CompactTxStreamer {
|
|||
// in the exclude list that don't exist in the mempool are ignored.
|
||||
rpc GetMempoolTx(Exclude) returns (stream CompactTx) {}
|
||||
|
||||
// Return a stream of current Mempool transactions. This will keep the output stream open while
|
||||
// there are mempool transactions. It will close the returned stream when a new block is mined.
|
||||
rpc GetMempoolStream(Empty) returns (stream RawTransaction) {}
|
||||
|
||||
// GetTreeState returns the note commitment tree state corresponding to the given block.
|
||||
// See section 3.7 of the Zcash protocol specification. It returns several other useful
|
||||
// values also (even though they can be obtained using GetBlock).
|
||||
|
|
Loading…
Reference in New Issue