From d461c187856703bab65e3b22fce39322ab918fa5 Mon Sep 17 00:00:00 2001 From: adityapk00 <31996805+adityapk00@users.noreply.github.com> Date: Wed, 14 Jul 2021 14:46:20 -0700 Subject: [PATCH] Mempool (#10) * mempool streaming * Fix deadlock * return correct height * Add mempool stream timeout * buffer channel * Mempool API * comment --- cmd/root.go | 7 ++ common/mempool.go | 178 +++++++++++++++++++++++++++++++++++ common/prometheusmetrics.go | 6 ++ frontend/service.go | 21 +++++ walletrpc/service.pb.go | 119 ++++++++++++----------- walletrpc/service.proto | 4 + walletrpc/service_grpc.pb.go | 65 ++++++++++++- 7 files changed, 343 insertions(+), 57 deletions(-) create mode 100644 common/mempool.go diff --git a/cmd/root.go b/cmd/root.go index dcdc910..70387a2 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -126,6 +126,7 @@ func startServer(opts *common.Options) error { promRegistry.MustRegister(common.Metrics.SendTransactionsCounter) promRegistry.MustRegister(common.Metrics.TotalSaplingParamsCounter) promRegistry.MustRegister(common.Metrics.TotalSproutParamsCounter) + promRegistry.MustRegister(common.Metrics.MempoolClientsGauge) promRegistry.MustRegister(common.Metrics.ZecPriceGauge) promRegistry.MustRegister(common.Metrics.ZecPriceHistoryWebAPICounter) promRegistry.MustRegister(common.Metrics.ZecPriceHistoryErrors) @@ -285,6 +286,10 @@ func startServer(opts *common.Options) error { // Initialize price fetcher common.StartPriceFetcher(dbPath, chainName) + // Initialize mempool monitor + exitMempool := make(chan bool) + common.StartMempoolMonitor(cache, exitMempool) + // Start listening listener, err := net.Listen("tcp", opts.GRPCBindAddr) if err != nil { @@ -303,6 +308,8 @@ func startServer(opts *common.Options) error { common.Log.WithFields(logrus.Fields{ "signal": s.String(), }).Info("caught signal, stopping gRPC server") + + exitMempool <- true os.Exit(1) }() diff --git a/common/mempool.go b/common/mempool.go new file mode 100644 index 0000000..9c19473 --- /dev/null +++ b/common/mempool.go @@ -0,0 +1,178 @@ +package common + +import ( + "encoding/hex" + "encoding/json" + "sync" + "sync/atomic" + "time" + + "github.com/adityapk00/lightwalletd/walletrpc" +) + +var ( + // List of all mempool transactions + txns map[string]*walletrpc.RawTransaction = make(map[string]*walletrpc.RawTransaction) + + // List of all clients waiting to recieve mempool txns + clients []chan<- *walletrpc.RawTransaction + + // Last height of the blocks. If this changes, then close all the clients and flush the mempool + lastHeight int + + // A pointer to the blockcache + blockcache *BlockCache + + // Mutex to lock the above 2 structs + lock sync.Mutex + + // Since the mutex doesn't have a "try_lock" method, we'll have to improvize with this + refreshing int32 = 0 +) + +// AddNewClient adds a new client to the list of clients to notify for mempool txns +func AddNewClient(client chan<- *walletrpc.RawTransaction) { + lock.Lock() + defer lock.Unlock() + + //Log.Infoln("Adding new client, sending ", len(txns), " transactions") + + // Also send all pending mempool txns + for _, rtx := range txns { + if client != nil { + client <- rtx + } + } + + if client != nil { + clients = append(clients, client) + } + Metrics.MempoolClientsGauge.Set(float64(len(clients))) +} + +// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients +func refreshMempoolTxns() error { + Log.Infoln("Refreshing mempool") + + // First check if another refresh is running, if it is, just return + if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) { + Log.Warnln("Another refresh in progress, returning") + return nil + } + + // Set refreshing to 0 when we exit + defer func() { + refreshing = 0 + }() + + // Check if the blockchain has changed, and if it has, then clear everything + + lock.Lock() + defer lock.Unlock() + + if lastHeight < blockcache.GetLatestHeight() { + Log.Infoln("Block height changed, clearing everything") + + // Flush all the clients + for _, client := range clients { + if client != nil { + close(client) + } + } + + clients = make([]chan<- *walletrpc.RawTransaction, 0) + + // Clear txns + txns = make(map[string]*walletrpc.RawTransaction) + + lastHeight = blockcache.GetLatestHeight() + } + + var mempoolList []string + params := make([]json.RawMessage, 0) + result, rpcErr := RawRequest("getrawmempool", params) + if rpcErr != nil { + return rpcErr + } + err := json.Unmarshal(result, &mempoolList) + if err != nil { + return err + } + + //println("getrawmempool size ", len(mempoolList)) + + // Fetch all new mempool txns and add them into `newTxns` + for _, txidstr := range mempoolList { + if _, ok := txns[txidstr]; !ok { + txidJSON, err := json.Marshal(txidstr) + if err != nil { + return err + } + // The "0" is because we only need the raw hex, which is returned as + // just a hex string, and not even a json string (with quotes). + params := []json.RawMessage{txidJSON, json.RawMessage("0")} + result, rpcErr := RawRequest("getrawtransaction", params) + if rpcErr != nil { + // Not an error; mempool transactions can disappear + continue + } + // strip the quotes + var txStr string + err = json.Unmarshal(result, &txStr) + if err != nil { + return err + } + + // conver to binary + txBytes, err := hex.DecodeString(txStr) + if err != nil { + return err + } + + newRtx := &walletrpc.RawTransaction{ + Data: txBytes, + Height: uint64(lastHeight), + } + + // Notify waiting clients + for _, client := range clients { + if client != nil { + client <- newRtx + } + } + + Log.Infoln("Adding new mempool txid", txidstr, " sending to ", len(clients), " clients") + txns[txidstr] = newRtx + } + } + + return nil +} + +// StartMempoolMonitor starts monitoring the mempool +func StartMempoolMonitor(cache *BlockCache, done <-chan bool) { + go func() { + ticker := time.NewTicker(2 * time.Second) + blockcache = cache + lastHeight = blockcache.GetLatestHeight() + + for { + select { + case <-ticker.C: + go func() { + //Log.Infoln("Ticker triggered") + err := refreshMempoolTxns() + if err != nil { + Log.Errorln("Mempool refresh error:", err.Error()) + } + }() + + case <-done: + for _, client := range clients { + close(client) + } + return + } + } + }() +} diff --git a/common/prometheusmetrics.go b/common/prometheusmetrics.go index 56e8bc4..3782a80 100644 --- a/common/prometheusmetrics.go +++ b/common/prometheusmetrics.go @@ -10,6 +10,7 @@ type PrometheusMetrics struct { TotalErrors prometheus.Counter TotalSaplingParamsCounter prometheus.Counter TotalSproutParamsCounter prometheus.Counter + MempoolClientsGauge prometheus.Gauge ZecPriceGauge prometheus.Gauge ZecPriceHistoryWebAPICounter prometheus.Counter ZecPriceHistoryErrors prometheus.Counter @@ -52,6 +53,11 @@ func GetPrometheusMetrics() *PrometheusMetrics { Help: "Total number of params downloasd for sprout params", }) + m.MempoolClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{ + Name: "mempool_clients", + Help: "Number of concurrent mempool clients", + }) + m.ZecPriceGauge = prometheus.NewGauge(prometheus.GaugeOpts{ Name: "zec_price", Help: "Current price of Zec", diff --git a/frontend/service.go b/frontend/service.go index a699065..3dd2ee9 100644 --- a/frontend/service.go +++ b/frontend/service.go @@ -622,6 +622,27 @@ func (s *lwdStreamer) GetMempoolTx(exclude *walletrpc.Exclude, resp walletrpc.Co return nil } +func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.CompactTxStreamer_GetMempoolStreamServer) error { + ch := make(chan *walletrpc.RawTransaction, 200) + go common.AddNewClient(ch) + + for { + select { + case rtx, more := <-ch: + if !more || rtx == nil { + return nil + } + + if resp.Send(rtx) != nil { + return nil + } + // Timeout after 5 mins + case <-time.After(5 * time.Minute): + return nil + } + } +} + // Return the subset of items that aren't excluded, but // if more than one item matches an exclude entry, return // all those items. diff --git a/walletrpc/service.pb.go b/walletrpc/service.pb.go index b6df6ec..2f09ebc 100644 --- a/walletrpc/service.pb.go +++ b/walletrpc/service.pb.go @@ -1438,7 +1438,7 @@ var file_service_proto_rawDesc = []byte{ 0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01, - 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x32, 0xf3, 0x0b, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70, + 0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x32, 0xd0, 0x0c, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x54, 0x78, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x12, 0x54, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, @@ -1504,39 +1504,44 @@ var file_service_proto_rawDesc = []byte{ 0x63, 0x2e, 0x45, 0x78, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x1a, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x54, 0x78, 0x22, 0x00, 0x30, 0x01, 0x12, - 0x52, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, - 0x1e, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, - 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x44, 0x1a, - 0x20, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, - 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, - 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, - 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, - 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, - 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72, - 0x67, 0x1a, 0x2f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, - 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, - 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x4c, 0x69, - 0x73, 0x74, 0x22, 0x00, 0x12, 0x73, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, - 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x29, 0x2e, - 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, - 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, - 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2b, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, - 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, - 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, - 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0d, 0x47, 0x65, 0x74, - 0x4c, 0x69, 0x67, 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, + 0x5b, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x6d, 0x70, 0x6f, 0x6f, 0x6c, 0x53, 0x74, 0x72, + 0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, + 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x45, 0x6d, 0x70, 0x74, + 0x79, 0x1a, 0x25, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, + 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x61, 0x77, 0x54, 0x72, 0x61, + 0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0c, + 0x47, 0x65, 0x74, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1e, 0x2e, 0x63, + 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, + 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x44, 0x1a, 0x20, 0x2e, 0x63, + 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, + 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x00, + 0x12, 0x6f, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, + 0x78, 0x6f, 0x73, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, + 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, + 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2f, + 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, + 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, + 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x4c, 0x69, 0x73, 0x74, 0x22, + 0x00, 0x12, 0x73, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, + 0x74, 0x78, 0x6f, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, - 0x70, 0x63, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, - 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, - 0x2e, 0x4c, 0x69, 0x67, 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a, - 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, - 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x75, - 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, - 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x50, - 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x1b, 0x5a, - 0x16, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x64, 0x2f, 0x77, 0x61, - 0x6c, 0x6c, 0x65, 0x74, 0x72, 0x70, 0x63, 0xba, 0x02, 0x00, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, - 0x6f, 0x33, + 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, + 0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2b, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, + 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, + 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x4c, 0x69, 0x67, + 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, + 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, + 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, + 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, + 0x67, 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x04, 0x50, 0x69, + 0x6e, 0x67, 0x12, 0x1f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, + 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x1a, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, + 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x50, 0x69, 0x6e, 0x67, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x1b, 0x5a, 0x16, 0x6c, 0x69, + 0x67, 0x68, 0x74, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x64, 0x2f, 0x77, 0x61, 0x6c, 0x6c, 0x65, + 0x74, 0x72, 0x70, 0x63, 0xba, 0x02, 0x00, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -1594,29 +1599,31 @@ var file_service_proto_depIdxs = []int32{ 12, // 13: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:input_type -> cash.z.wallet.sdk.rpc.AddressList 11, // 14: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:input_type -> cash.z.wallet.sdk.rpc.Address 14, // 15: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:input_type -> cash.z.wallet.sdk.rpc.Exclude - 0, // 16: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:input_type -> cash.z.wallet.sdk.rpc.BlockID - 16, // 17: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg - 16, // 18: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg - 6, // 19: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:input_type -> cash.z.wallet.sdk.rpc.Empty - 9, // 20: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:input_type -> cash.z.wallet.sdk.rpc.Duration - 0, // 21: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLatestBlock:output_type -> cash.z.wallet.sdk.rpc.BlockID - 21, // 22: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlock:output_type -> cash.z.wallet.sdk.rpc.CompactBlock - 21, // 23: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlockRange:output_type -> cash.z.wallet.sdk.rpc.CompactBlock - 20, // 24: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse - 20, // 25: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetCurrentZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse - 3, // 26: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTransaction:output_type -> cash.z.wallet.sdk.rpc.RawTransaction - 4, // 27: cash.z.wallet.sdk.rpc.CompactTxStreamer.SendTransaction:output_type -> cash.z.wallet.sdk.rpc.SendResponse - 3, // 28: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressTxids:output_type -> cash.z.wallet.sdk.rpc.RawTransaction - 13, // 29: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:output_type -> cash.z.wallet.sdk.rpc.Balance - 13, // 30: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:output_type -> cash.z.wallet.sdk.rpc.Balance - 22, // 31: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:output_type -> cash.z.wallet.sdk.rpc.CompactTx - 15, // 32: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:output_type -> cash.z.wallet.sdk.rpc.TreeState - 18, // 33: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReplyList - 17, // 34: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReply - 7, // 35: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:output_type -> cash.z.wallet.sdk.rpc.LightdInfo - 10, // 36: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:output_type -> cash.z.wallet.sdk.rpc.PingResponse - 21, // [21:37] is the sub-list for method output_type - 5, // [5:21] is the sub-list for method input_type + 6, // 16: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolStream:input_type -> cash.z.wallet.sdk.rpc.Empty + 0, // 17: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:input_type -> cash.z.wallet.sdk.rpc.BlockID + 16, // 18: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg + 16, // 19: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg + 6, // 20: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:input_type -> cash.z.wallet.sdk.rpc.Empty + 9, // 21: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:input_type -> cash.z.wallet.sdk.rpc.Duration + 0, // 22: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLatestBlock:output_type -> cash.z.wallet.sdk.rpc.BlockID + 21, // 23: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlock:output_type -> cash.z.wallet.sdk.rpc.CompactBlock + 21, // 24: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlockRange:output_type -> cash.z.wallet.sdk.rpc.CompactBlock + 20, // 25: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse + 20, // 26: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetCurrentZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse + 3, // 27: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTransaction:output_type -> cash.z.wallet.sdk.rpc.RawTransaction + 4, // 28: cash.z.wallet.sdk.rpc.CompactTxStreamer.SendTransaction:output_type -> cash.z.wallet.sdk.rpc.SendResponse + 3, // 29: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressTxids:output_type -> cash.z.wallet.sdk.rpc.RawTransaction + 13, // 30: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:output_type -> cash.z.wallet.sdk.rpc.Balance + 13, // 31: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:output_type -> cash.z.wallet.sdk.rpc.Balance + 22, // 32: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:output_type -> cash.z.wallet.sdk.rpc.CompactTx + 3, // 33: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolStream:output_type -> cash.z.wallet.sdk.rpc.RawTransaction + 15, // 34: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:output_type -> cash.z.wallet.sdk.rpc.TreeState + 18, // 35: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReplyList + 17, // 36: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReply + 7, // 37: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:output_type -> cash.z.wallet.sdk.rpc.LightdInfo + 10, // 38: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:output_type -> cash.z.wallet.sdk.rpc.PingResponse + 22, // [22:39] is the sub-list for method output_type + 5, // [5:22] is the sub-list for method input_type 5, // [5:5] is the sub-list for extension type_name 5, // [5:5] is the sub-list for extension extendee 0, // [0:5] is the sub-list for field type_name diff --git a/walletrpc/service.proto b/walletrpc/service.proto index efbe345..b99e7f9 100644 --- a/walletrpc/service.proto +++ b/walletrpc/service.proto @@ -188,6 +188,10 @@ service CompactTxStreamer { // in the exclude list that don't exist in the mempool are ignored. rpc GetMempoolTx(Exclude) returns (stream CompactTx) {} + // Return a stream of current Mempool transactions. This will keep the output stream open while + // there are mempool transactions. It will close the returned stream when a new block is mined. + rpc GetMempoolStream(Empty) returns (stream RawTransaction) {} + // GetTreeState returns the note commitment tree state corresponding to the given block. // See section 3.7 of the Zcash protocol specification. It returns several other useful // values also (even though they can be obtained using GetBlock). diff --git a/walletrpc/service_grpc.pb.go b/walletrpc/service_grpc.pb.go index 222f3cc..3500757 100644 --- a/walletrpc/service_grpc.pb.go +++ b/walletrpc/service_grpc.pb.go @@ -45,6 +45,7 @@ type CompactTxStreamerClient interface { // match a shortened txid, they are all sent (none is excluded). Transactions // in the exclude list that don't exist in the mempool are ignored. GetMempoolTx(ctx context.Context, in *Exclude, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolTxClient, error) + GetMempoolStream(ctx context.Context, in *Empty, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolStreamClient, error) // GetTreeState returns the note commitment tree state corresponding to the given block. // See section 3.7 of the Zcash protocol specification. It returns several other useful // values also (even though they can be obtained using GetBlock). @@ -259,6 +260,38 @@ func (x *compactTxStreamerGetMempoolTxClient) Recv() (*CompactTx, error) { return m, nil } +func (c *compactTxStreamerClient) GetMempoolStream(ctx context.Context, in *Empty, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolStreamClient, error) { + stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[4], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetMempoolStream", opts...) + if err != nil { + return nil, err + } + x := &compactTxStreamerGetMempoolStreamClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type CompactTxStreamer_GetMempoolStreamClient interface { + Recv() (*RawTransaction, error) + grpc.ClientStream +} + +type compactTxStreamerGetMempoolStreamClient struct { + grpc.ClientStream +} + +func (x *compactTxStreamerGetMempoolStreamClient) Recv() (*RawTransaction, error) { + m := new(RawTransaction) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + func (c *compactTxStreamerClient) GetTreeState(ctx context.Context, in *BlockID, opts ...grpc.CallOption) (*TreeState, error) { out := new(TreeState) err := c.cc.Invoke(ctx, "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetTreeState", in, out, opts...) @@ -278,7 +311,7 @@ func (c *compactTxStreamerClient) GetAddressUtxos(ctx context.Context, in *GetAd } func (c *compactTxStreamerClient) GetAddressUtxosStream(ctx context.Context, in *GetAddressUtxosArg, opts ...grpc.CallOption) (CompactTxStreamer_GetAddressUtxosStreamClient, error) { - stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[4], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetAddressUtxosStream", opts...) + stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[5], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetAddressUtxosStream", opts...) if err != nil { return nil, err } @@ -358,6 +391,7 @@ type CompactTxStreamerServer interface { // match a shortened txid, they are all sent (none is excluded). Transactions // in the exclude list that don't exist in the mempool are ignored. GetMempoolTx(*Exclude, CompactTxStreamer_GetMempoolTxServer) error + GetMempoolStream(*Empty, CompactTxStreamer_GetMempoolStreamServer) error // GetTreeState returns the note commitment tree state corresponding to the given block. // See section 3.7 of the Zcash protocol specification. It returns several other useful // values also (even though they can be obtained using GetBlock). @@ -409,6 +443,9 @@ func (UnimplementedCompactTxStreamerServer) GetTaddressBalanceStream(CompactTxSt func (UnimplementedCompactTxStreamerServer) GetMempoolTx(*Exclude, CompactTxStreamer_GetMempoolTxServer) error { return status.Errorf(codes.Unimplemented, "method GetMempoolTx not implemented") } +func (UnimplementedCompactTxStreamerServer) GetMempoolStream(*Empty, CompactTxStreamer_GetMempoolStreamServer) error { + return status.Errorf(codes.Unimplemented, "method GetMempoolStream not implemented") +} func (UnimplementedCompactTxStreamerServer) GetTreeState(context.Context, *BlockID) (*TreeState, error) { return nil, status.Errorf(codes.Unimplemented, "method GetTreeState not implemented") } @@ -652,6 +689,27 @@ func (x *compactTxStreamerGetMempoolTxServer) Send(m *CompactTx) error { return x.ServerStream.SendMsg(m) } +func _CompactTxStreamer_GetMempoolStream_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(Empty) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(CompactTxStreamerServer).GetMempoolStream(m, &compactTxStreamerGetMempoolStreamServer{stream}) +} + +type CompactTxStreamer_GetMempoolStreamServer interface { + Send(*RawTransaction) error + grpc.ServerStream +} + +type compactTxStreamerGetMempoolStreamServer struct { + grpc.ServerStream +} + +func (x *compactTxStreamerGetMempoolStreamServer) Send(m *RawTransaction) error { + return x.ServerStream.SendMsg(m) +} + func _CompactTxStreamer_GetTreeState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) { in := new(BlockID) if err := dec(in); err != nil { @@ -818,6 +876,11 @@ var CompactTxStreamer_ServiceDesc = grpc.ServiceDesc{ Handler: _CompactTxStreamer_GetMempoolTx_Handler, ServerStreams: true, }, + { + StreamName: "GetMempoolStream", + Handler: _CompactTxStreamer_GetMempoolStream_Handler, + ServerStreams: true, + }, { StreamName: "GetAddressUtxosStream", Handler: _CompactTxStreamer_GetAddressUtxosStream_Handler,