* mempool streaming

* Fix deadlock

* return correct height

* Add mempool stream timeout

* buffer channel

* Mempool API

* comment
This commit is contained in:
adityapk00 2021-07-14 14:46:20 -07:00 committed by GitHub
parent a56af1cfbf
commit d461c18785
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 343 additions and 57 deletions

View File

@ -126,6 +126,7 @@ func startServer(opts *common.Options) error {
promRegistry.MustRegister(common.Metrics.SendTransactionsCounter)
promRegistry.MustRegister(common.Metrics.TotalSaplingParamsCounter)
promRegistry.MustRegister(common.Metrics.TotalSproutParamsCounter)
promRegistry.MustRegister(common.Metrics.MempoolClientsGauge)
promRegistry.MustRegister(common.Metrics.ZecPriceGauge)
promRegistry.MustRegister(common.Metrics.ZecPriceHistoryWebAPICounter)
promRegistry.MustRegister(common.Metrics.ZecPriceHistoryErrors)
@ -285,6 +286,10 @@ func startServer(opts *common.Options) error {
// Initialize price fetcher
common.StartPriceFetcher(dbPath, chainName)
// Initialize mempool monitor
exitMempool := make(chan bool)
common.StartMempoolMonitor(cache, exitMempool)
// Start listening
listener, err := net.Listen("tcp", opts.GRPCBindAddr)
if err != nil {
@ -303,6 +308,8 @@ func startServer(opts *common.Options) error {
common.Log.WithFields(logrus.Fields{
"signal": s.String(),
}).Info("caught signal, stopping gRPC server")
exitMempool <- true
os.Exit(1)
}()

178
common/mempool.go Normal file
View File

@ -0,0 +1,178 @@
package common
import (
"encoding/hex"
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/adityapk00/lightwalletd/walletrpc"
)
var (
// List of all mempool transactions
txns map[string]*walletrpc.RawTransaction = make(map[string]*walletrpc.RawTransaction)
// List of all clients waiting to recieve mempool txns
clients []chan<- *walletrpc.RawTransaction
// Last height of the blocks. If this changes, then close all the clients and flush the mempool
lastHeight int
// A pointer to the blockcache
blockcache *BlockCache
// Mutex to lock the above 2 structs
lock sync.Mutex
// Since the mutex doesn't have a "try_lock" method, we'll have to improvize with this
refreshing int32 = 0
)
// AddNewClient adds a new client to the list of clients to notify for mempool txns
func AddNewClient(client chan<- *walletrpc.RawTransaction) {
lock.Lock()
defer lock.Unlock()
//Log.Infoln("Adding new client, sending ", len(txns), " transactions")
// Also send all pending mempool txns
for _, rtx := range txns {
if client != nil {
client <- rtx
}
}
if client != nil {
clients = append(clients, client)
}
Metrics.MempoolClientsGauge.Set(float64(len(clients)))
}
// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients
func refreshMempoolTxns() error {
Log.Infoln("Refreshing mempool")
// First check if another refresh is running, if it is, just return
if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) {
Log.Warnln("Another refresh in progress, returning")
return nil
}
// Set refreshing to 0 when we exit
defer func() {
refreshing = 0
}()
// Check if the blockchain has changed, and if it has, then clear everything
lock.Lock()
defer lock.Unlock()
if lastHeight < blockcache.GetLatestHeight() {
Log.Infoln("Block height changed, clearing everything")
// Flush all the clients
for _, client := range clients {
if client != nil {
close(client)
}
}
clients = make([]chan<- *walletrpc.RawTransaction, 0)
// Clear txns
txns = make(map[string]*walletrpc.RawTransaction)
lastHeight = blockcache.GetLatestHeight()
}
var mempoolList []string
params := make([]json.RawMessage, 0)
result, rpcErr := RawRequest("getrawmempool", params)
if rpcErr != nil {
return rpcErr
}
err := json.Unmarshal(result, &mempoolList)
if err != nil {
return err
}
//println("getrawmempool size ", len(mempoolList))
// Fetch all new mempool txns and add them into `newTxns`
for _, txidstr := range mempoolList {
if _, ok := txns[txidstr]; !ok {
txidJSON, err := json.Marshal(txidstr)
if err != nil {
return err
}
// The "0" is because we only need the raw hex, which is returned as
// just a hex string, and not even a json string (with quotes).
params := []json.RawMessage{txidJSON, json.RawMessage("0")}
result, rpcErr := RawRequest("getrawtransaction", params)
if rpcErr != nil {
// Not an error; mempool transactions can disappear
continue
}
// strip the quotes
var txStr string
err = json.Unmarshal(result, &txStr)
if err != nil {
return err
}
// conver to binary
txBytes, err := hex.DecodeString(txStr)
if err != nil {
return err
}
newRtx := &walletrpc.RawTransaction{
Data: txBytes,
Height: uint64(lastHeight),
}
// Notify waiting clients
for _, client := range clients {
if client != nil {
client <- newRtx
}
}
Log.Infoln("Adding new mempool txid", txidstr, " sending to ", len(clients), " clients")
txns[txidstr] = newRtx
}
}
return nil
}
// StartMempoolMonitor starts monitoring the mempool
func StartMempoolMonitor(cache *BlockCache, done <-chan bool) {
go func() {
ticker := time.NewTicker(2 * time.Second)
blockcache = cache
lastHeight = blockcache.GetLatestHeight()
for {
select {
case <-ticker.C:
go func() {
//Log.Infoln("Ticker triggered")
err := refreshMempoolTxns()
if err != nil {
Log.Errorln("Mempool refresh error:", err.Error())
}
}()
case <-done:
for _, client := range clients {
close(client)
}
return
}
}
}()
}

View File

@ -10,6 +10,7 @@ type PrometheusMetrics struct {
TotalErrors prometheus.Counter
TotalSaplingParamsCounter prometheus.Counter
TotalSproutParamsCounter prometheus.Counter
MempoolClientsGauge prometheus.Gauge
ZecPriceGauge prometheus.Gauge
ZecPriceHistoryWebAPICounter prometheus.Counter
ZecPriceHistoryErrors prometheus.Counter
@ -52,6 +53,11 @@ func GetPrometheusMetrics() *PrometheusMetrics {
Help: "Total number of params downloasd for sprout params",
})
m.MempoolClientsGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "mempool_clients",
Help: "Number of concurrent mempool clients",
})
m.ZecPriceGauge = prometheus.NewGauge(prometheus.GaugeOpts{
Name: "zec_price",
Help: "Current price of Zec",

View File

@ -622,6 +622,27 @@ func (s *lwdStreamer) GetMempoolTx(exclude *walletrpc.Exclude, resp walletrpc.Co
return nil
}
func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.CompactTxStreamer_GetMempoolStreamServer) error {
ch := make(chan *walletrpc.RawTransaction, 200)
go common.AddNewClient(ch)
for {
select {
case rtx, more := <-ch:
if !more || rtx == nil {
return nil
}
if resp.Send(rtx) != nil {
return nil
}
// Timeout after 5 mins
case <-time.After(5 * time.Minute):
return nil
}
}
}
// Return the subset of items that aren't excluded, but
// if more than one item matches an exclude entry, return
// all those items.

View File

@ -1438,7 +1438,7 @@ var file_service_proto_rawDesc = []byte{
0x73, 0x74, 0x61, 0x6d, 0x70, 0x12, 0x1a, 0x0a, 0x08, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63,
0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63,
0x79, 0x12, 0x14, 0x0a, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x18, 0x03, 0x20, 0x01, 0x28, 0x01,
0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x32, 0xf3, 0x0b, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70,
0x52, 0x05, 0x70, 0x72, 0x69, 0x63, 0x65, 0x32, 0xd0, 0x0c, 0x0a, 0x11, 0x43, 0x6f, 0x6d, 0x70,
0x61, 0x63, 0x74, 0x54, 0x78, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x65, 0x72, 0x12, 0x54, 0x0a,
0x0e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x12,
0x20, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e,
@ -1504,39 +1504,44 @@ var file_service_proto_rawDesc = []byte{
0x63, 0x2e, 0x45, 0x78, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x1a, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x68,
0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70,
0x63, 0x2e, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x54, 0x78, 0x22, 0x00, 0x30, 0x01, 0x12,
0x52, 0x0a, 0x0c, 0x47, 0x65, 0x74, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12,
0x1e, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e,
0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x44, 0x1a,
0x20, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e,
0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74,
0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e,
0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47,
0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72,
0x67, 0x1a, 0x2f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65,
0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64,
0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x4c, 0x69,
0x73, 0x74, 0x22, 0x00, 0x12, 0x73, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65,
0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x29, 0x2e,
0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64,
0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73,
0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2b, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e,
0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63,
0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73,
0x52, 0x65, 0x70, 0x6c, 0x79, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0d, 0x47, 0x65, 0x74,
0x4c, 0x69, 0x67, 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73,
0x5b, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4d, 0x65, 0x6d, 0x70, 0x6f, 0x6f, 0x6c, 0x53, 0x74, 0x72,
0x65, 0x61, 0x6d, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c,
0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x45, 0x6d, 0x70, 0x74,
0x79, 0x1a, 0x25, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65,
0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x61, 0x77, 0x54, 0x72, 0x61,
0x6e, 0x73, 0x61, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0c,
0x47, 0x65, 0x74, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1e, 0x2e, 0x63,
0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b,
0x2e, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x44, 0x1a, 0x20, 0x2e, 0x63,
0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b,
0x2e, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x00,
0x12, 0x6f, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74,
0x78, 0x6f, 0x73, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c,
0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2f,
0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73,
0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x4c, 0x69, 0x73, 0x74, 0x22,
0x00, 0x12, 0x73, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55,
0x74, 0x78, 0x6f, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73,
0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72,
0x70, 0x63, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e,
0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63,
0x2e, 0x4c, 0x69, 0x67, 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a,
0x04, 0x50, 0x69, 0x6e, 0x67, 0x12, 0x1f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77,
0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x75,
0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x1a, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e,
0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x50,
0x69, 0x6e, 0x67, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x1b, 0x5a,
0x16, 0x6c, 0x69, 0x67, 0x68, 0x74, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x64, 0x2f, 0x77, 0x61,
0x6c, 0x6c, 0x65, 0x74, 0x72, 0x70, 0x63, 0xba, 0x02, 0x00, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x33,
0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78,
0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2b, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77,
0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65,
0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70,
0x6c, 0x79, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x4c, 0x69, 0x67,
0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a,
0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77,
0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69,
0x67, 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x04, 0x50, 0x69,
0x6e, 0x67, 0x12, 0x1f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c,
0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x1a, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c,
0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x50, 0x69, 0x6e, 0x67,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x1b, 0x5a, 0x16, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x64, 0x2f, 0x77, 0x61, 0x6c, 0x6c, 0x65,
0x74, 0x72, 0x70, 0x63, 0xba, 0x02, 0x00, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -1594,29 +1599,31 @@ var file_service_proto_depIdxs = []int32{
12, // 13: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:input_type -> cash.z.wallet.sdk.rpc.AddressList
11, // 14: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:input_type -> cash.z.wallet.sdk.rpc.Address
14, // 15: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:input_type -> cash.z.wallet.sdk.rpc.Exclude
0, // 16: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:input_type -> cash.z.wallet.sdk.rpc.BlockID
16, // 17: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
16, // 18: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
6, // 19: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:input_type -> cash.z.wallet.sdk.rpc.Empty
9, // 20: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:input_type -> cash.z.wallet.sdk.rpc.Duration
0, // 21: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLatestBlock:output_type -> cash.z.wallet.sdk.rpc.BlockID
21, // 22: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlock:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
21, // 23: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlockRange:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
20, // 24: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse
20, // 25: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetCurrentZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse
3, // 26: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTransaction:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
4, // 27: cash.z.wallet.sdk.rpc.CompactTxStreamer.SendTransaction:output_type -> cash.z.wallet.sdk.rpc.SendResponse
3, // 28: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressTxids:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
13, // 29: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:output_type -> cash.z.wallet.sdk.rpc.Balance
13, // 30: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:output_type -> cash.z.wallet.sdk.rpc.Balance
22, // 31: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:output_type -> cash.z.wallet.sdk.rpc.CompactTx
15, // 32: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:output_type -> cash.z.wallet.sdk.rpc.TreeState
18, // 33: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReplyList
17, // 34: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReply
7, // 35: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:output_type -> cash.z.wallet.sdk.rpc.LightdInfo
10, // 36: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:output_type -> cash.z.wallet.sdk.rpc.PingResponse
21, // [21:37] is the sub-list for method output_type
5, // [5:21] is the sub-list for method input_type
6, // 16: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolStream:input_type -> cash.z.wallet.sdk.rpc.Empty
0, // 17: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:input_type -> cash.z.wallet.sdk.rpc.BlockID
16, // 18: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
16, // 19: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
6, // 20: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:input_type -> cash.z.wallet.sdk.rpc.Empty
9, // 21: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:input_type -> cash.z.wallet.sdk.rpc.Duration
0, // 22: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLatestBlock:output_type -> cash.z.wallet.sdk.rpc.BlockID
21, // 23: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlock:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
21, // 24: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlockRange:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
20, // 25: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse
20, // 26: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetCurrentZECPrice:output_type -> cash.z.wallet.sdk.rpc.PriceResponse
3, // 27: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTransaction:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
4, // 28: cash.z.wallet.sdk.rpc.CompactTxStreamer.SendTransaction:output_type -> cash.z.wallet.sdk.rpc.SendResponse
3, // 29: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressTxids:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
13, // 30: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:output_type -> cash.z.wallet.sdk.rpc.Balance
13, // 31: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:output_type -> cash.z.wallet.sdk.rpc.Balance
22, // 32: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:output_type -> cash.z.wallet.sdk.rpc.CompactTx
3, // 33: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolStream:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
15, // 34: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:output_type -> cash.z.wallet.sdk.rpc.TreeState
18, // 35: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReplyList
17, // 36: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReply
7, // 37: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:output_type -> cash.z.wallet.sdk.rpc.LightdInfo
10, // 38: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:output_type -> cash.z.wallet.sdk.rpc.PingResponse
22, // [22:39] is the sub-list for method output_type
5, // [5:22] is the sub-list for method input_type
5, // [5:5] is the sub-list for extension type_name
5, // [5:5] is the sub-list for extension extendee
0, // [0:5] is the sub-list for field type_name

View File

@ -188,6 +188,10 @@ service CompactTxStreamer {
// in the exclude list that don't exist in the mempool are ignored.
rpc GetMempoolTx(Exclude) returns (stream CompactTx) {}
// Return a stream of current Mempool transactions. This will keep the output stream open while
// there are mempool transactions. It will close the returned stream when a new block is mined.
rpc GetMempoolStream(Empty) returns (stream RawTransaction) {}
// GetTreeState returns the note commitment tree state corresponding to the given block.
// See section 3.7 of the Zcash protocol specification. It returns several other useful
// values also (even though they can be obtained using GetBlock).

View File

@ -45,6 +45,7 @@ type CompactTxStreamerClient interface {
// match a shortened txid, they are all sent (none is excluded). Transactions
// in the exclude list that don't exist in the mempool are ignored.
GetMempoolTx(ctx context.Context, in *Exclude, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolTxClient, error)
GetMempoolStream(ctx context.Context, in *Empty, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolStreamClient, error)
// GetTreeState returns the note commitment tree state corresponding to the given block.
// See section 3.7 of the Zcash protocol specification. It returns several other useful
// values also (even though they can be obtained using GetBlock).
@ -259,6 +260,38 @@ func (x *compactTxStreamerGetMempoolTxClient) Recv() (*CompactTx, error) {
return m, nil
}
func (c *compactTxStreamerClient) GetMempoolStream(ctx context.Context, in *Empty, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[4], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetMempoolStream", opts...)
if err != nil {
return nil, err
}
x := &compactTxStreamerGetMempoolStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type CompactTxStreamer_GetMempoolStreamClient interface {
Recv() (*RawTransaction, error)
grpc.ClientStream
}
type compactTxStreamerGetMempoolStreamClient struct {
grpc.ClientStream
}
func (x *compactTxStreamerGetMempoolStreamClient) Recv() (*RawTransaction, error) {
m := new(RawTransaction)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *compactTxStreamerClient) GetTreeState(ctx context.Context, in *BlockID, opts ...grpc.CallOption) (*TreeState, error) {
out := new(TreeState)
err := c.cc.Invoke(ctx, "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetTreeState", in, out, opts...)
@ -278,7 +311,7 @@ func (c *compactTxStreamerClient) GetAddressUtxos(ctx context.Context, in *GetAd
}
func (c *compactTxStreamerClient) GetAddressUtxosStream(ctx context.Context, in *GetAddressUtxosArg, opts ...grpc.CallOption) (CompactTxStreamer_GetAddressUtxosStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[4], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetAddressUtxosStream", opts...)
stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[5], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetAddressUtxosStream", opts...)
if err != nil {
return nil, err
}
@ -358,6 +391,7 @@ type CompactTxStreamerServer interface {
// match a shortened txid, they are all sent (none is excluded). Transactions
// in the exclude list that don't exist in the mempool are ignored.
GetMempoolTx(*Exclude, CompactTxStreamer_GetMempoolTxServer) error
GetMempoolStream(*Empty, CompactTxStreamer_GetMempoolStreamServer) error
// GetTreeState returns the note commitment tree state corresponding to the given block.
// See section 3.7 of the Zcash protocol specification. It returns several other useful
// values also (even though they can be obtained using GetBlock).
@ -409,6 +443,9 @@ func (UnimplementedCompactTxStreamerServer) GetTaddressBalanceStream(CompactTxSt
func (UnimplementedCompactTxStreamerServer) GetMempoolTx(*Exclude, CompactTxStreamer_GetMempoolTxServer) error {
return status.Errorf(codes.Unimplemented, "method GetMempoolTx not implemented")
}
func (UnimplementedCompactTxStreamerServer) GetMempoolStream(*Empty, CompactTxStreamer_GetMempoolStreamServer) error {
return status.Errorf(codes.Unimplemented, "method GetMempoolStream not implemented")
}
func (UnimplementedCompactTxStreamerServer) GetTreeState(context.Context, *BlockID) (*TreeState, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetTreeState not implemented")
}
@ -652,6 +689,27 @@ func (x *compactTxStreamerGetMempoolTxServer) Send(m *CompactTx) error {
return x.ServerStream.SendMsg(m)
}
func _CompactTxStreamer_GetMempoolStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Empty)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(CompactTxStreamerServer).GetMempoolStream(m, &compactTxStreamerGetMempoolStreamServer{stream})
}
type CompactTxStreamer_GetMempoolStreamServer interface {
Send(*RawTransaction) error
grpc.ServerStream
}
type compactTxStreamerGetMempoolStreamServer struct {
grpc.ServerStream
}
func (x *compactTxStreamerGetMempoolStreamServer) Send(m *RawTransaction) error {
return x.ServerStream.SendMsg(m)
}
func _CompactTxStreamer_GetTreeState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BlockID)
if err := dec(in); err != nil {
@ -818,6 +876,11 @@ var CompactTxStreamer_ServiceDesc = grpc.ServiceDesc{
Handler: _CompactTxStreamer_GetMempoolTx_Handler,
ServerStreams: true,
},
{
StreamName: "GetMempoolStream",
Handler: _CompactTxStreamer_GetMempoolStream_Handler,
ServerStreams: true,
},
{
StreamName: "GetAddressUtxosStream",
Handler: _CompactTxStreamer_GetAddressUtxosStream_Handler,