modifications to adityapk00 mempool streaming API

This commit is based on adityapk00 streaming mempool interface but
avoids using goroutines, which are difficult to reason about.

Co-authored-by: Aditya Kulkarni <adityapk@gmail.com>
This commit is contained in:
Larry Ruane 2021-07-19 11:04:09 -06:00
parent 5e13c60d59
commit 2b4f3e6f9b
7 changed files with 249 additions and 212 deletions

View File

@ -267,10 +267,6 @@ func startServer(opts *common.Options) error {
walletrpc.RegisterDarksideStreamerServer(server, service)
}
// Initialize mempool monitor
exitMempool := make(chan bool)
common.StartMempoolMonitor(cache, exitMempool)
// Start listening
listener, err := net.Listen("tcp", opts.GRPCBindAddr)
if err != nil {
@ -286,7 +282,6 @@ func startServer(opts *common.Options) error {
go func() {
s := <-signals
cache.Sync()
exitMempool <- true
common.Log.WithFields(logrus.Fields{
"signal": s.String(),
}).Info("caught signal, stopping gRPC server")

View File

@ -77,6 +77,7 @@ type (
Chain string
Upgrades map[string]Upgradeinfo
Blocks int
BestBlockHash string
Consensus ConsensusInfo
EstimatedHeight int
}

View File

@ -4,174 +4,152 @@ import (
"encoding/hex"
"encoding/json"
"sync"
"sync/atomic"
"time"
"github.com/zcash/lightwalletd/walletrpc"
)
type txid string
var (
// List of all mempool transactions
txns map[string]*walletrpc.RawTransaction = make(map[string]*walletrpc.RawTransaction)
// Set of mempool txids that have been seen during the current block interval.
// The zcashd RPC `getrawmempool` returns the entire mempool each time, so
// this allows us to ignore the txids that we've already seen.
g_txidSeen map[txid]struct{} = map[txid]struct{}{}
// List of all clients waiting to recieve mempool txns
clients []chan<- *walletrpc.RawTransaction
// List of transactions during current block interval, in order received. Each
// client thread can keep an index into this slice to record which transactions
// it's sent back to the client (everything before that index). The g_txidSeen
// map allows this list to not contain duplicates.
g_txList []*walletrpc.RawTransaction
// Last height of the blocks. If this changes, then close all the clients and flush the mempool
lastHeight int
// The most recent absolute time that we fetched the mempool and the latest
// (tip) block hash (so we know when a new block has been mined).
g_lastTime time.Time
// A pointer to the blockcache
blockcache *BlockCache
// The most recent zcashd getblockchaininfo reply, for height and best block
// hash (tip) which is used to detect when a new block arrives.
g_lastBlockChainInfo *ZcashdRpcReplyGetblockchaininfo = &ZcashdRpcReplyGetblockchaininfo{}
// Mutex to lock the above 2 structs
lock sync.Mutex
// Since the mutex doesn't have a "try_lock" method, we'll have to improvize with this
refreshing int32 = 0
// Mutex to protect the above variables.
g_lock sync.Mutex
)
// AddNewClient adds a new client to the list of clients to notify for mempool txns
func AddNewClient(client chan<- *walletrpc.RawTransaction) {
lock.Lock()
defer lock.Unlock()
func GetMempool(sendToClient func(*walletrpc.RawTransaction) error) error {
g_lock.Lock()
index := 0
// Stay in this function until the tip block hash changes.
stayHash := g_lastBlockChainInfo.BestBlockHash
//Log.Infoln("Adding new client, sending ", len(txns), " transactions")
// Also send all pending mempool txns
for _, rtx := range txns {
if client != nil {
client <- rtx
// Wait for more transactions to be added to the list
for {
// Don't fetch the mempool more often than every 2 seconds.
if time.Since(g_lastTime) > 2*time.Second {
blockChainInfo, err := getLatestBlockChainInfo()
if err != nil {
g_lock.Unlock()
return err
}
if g_lastBlockChainInfo.BestBlockHash != blockChainInfo.BestBlockHash {
// A new block has arrived
g_lastBlockChainInfo = blockChainInfo
Log.Infoln("Latest Block changed, clearing everything")
// We're the first thread to notice, clear cached state.
g_txidSeen = map[txid]struct{}{}
g_txList = []*walletrpc.RawTransaction{}
g_lastTime = time.Time{}
break
}
if err = refreshMempoolTxns(); err != nil {
g_lock.Unlock()
return err
}
g_lastTime = time.Now()
}
// Send transactions we haven't sent yet, best to not do so while
// holding the mutex, since this call may get flow-controlled.
toSend := g_txList[index:]
index = len(g_txList)
g_lock.Unlock()
for _, tx := range toSend {
if err := sendToClient(tx); err != nil {
return err
}
}
time.Sleep(200 * time.Millisecond)
g_lock.Lock()
if g_lastBlockChainInfo.BestBlockHash != stayHash {
break
}
}
if client != nil {
clients = append(clients, client)
}
g_lock.Unlock()
return nil
}
// RefreshMempoolTxns gets all new mempool txns and sends any new ones to waiting clients
func refreshMempoolTxns() error {
Log.Infoln("Refreshing mempool")
// First check if another refresh is running, if it is, just return
if !atomic.CompareAndSwapInt32(&refreshing, 0, 1) {
Log.Warnln("Another refresh in progress, returning")
return nil
}
// Set refreshing to 0 when we exit
defer func() {
refreshing = 0
}()
// Check if the blockchain has changed, and if it has, then clear everything
lock.Lock()
defer lock.Unlock()
if lastHeight < blockcache.GetLatestHeight() {
Log.Infoln("Block height changed, clearing everything")
// Flush all the clients
for _, client := range clients {
if client != nil {
close(client)
}
}
clients = make([]chan<- *walletrpc.RawTransaction, 0)
// Clear txns
txns = make(map[string]*walletrpc.RawTransaction)
lastHeight = blockcache.GetLatestHeight()
}
var mempoolList []string
params := make([]json.RawMessage, 0)
params := []json.RawMessage{}
result, rpcErr := RawRequest("getrawmempool", params)
if rpcErr != nil {
return rpcErr
}
var mempoolList []string
err := json.Unmarshal(result, &mempoolList)
if err != nil {
return err
}
//println("getrawmempool size ", len(mempoolList))
// Fetch all new mempool txns and add them into `newTxns`
for _, txidstr := range mempoolList {
if _, ok := txns[txidstr]; !ok {
txidJSON, err := json.Marshal(txidstr)
if err != nil {
return err
}
// The "0" is because we only need the raw hex, which is returned as
// just a hex string, and not even a json string (with quotes).
params := []json.RawMessage{txidJSON, json.RawMessage("0")}
result, rpcErr := RawRequest("getrawtransaction", params)
if rpcErr != nil {
// Not an error; mempool transactions can disappear
continue
}
// strip the quotes
var txStr string
err = json.Unmarshal(result, &txStr)
if err != nil {
return err
}
// conver to binary
txBytes, err := hex.DecodeString(txStr)
if err != nil {
return err
}
newRtx := &walletrpc.RawTransaction{
Data: txBytes,
Height: uint64(lastHeight),
}
// Notify waiting clients
for _, client := range clients {
if client != nil {
client <- newRtx
}
}
Log.Infoln("Adding new mempool txid", txidstr, " sending to ", len(clients), " clients")
txns[txidstr] = newRtx
if _, ok := g_txidSeen[txid(txidstr)]; ok {
// We've already fetched this transaction
continue
}
g_txidSeen[txid(txidstr)] = struct{}{}
// We haven't fetched this transaction already.
txidJSON, err := json.Marshal(txidstr)
if err != nil {
return err
}
// The "0" is because we only need the raw hex, which is returned as
// just a hex string, and not even a json string (with quotes).
params := []json.RawMessage{txidJSON, json.RawMessage("0")}
result, rpcErr := RawRequest("getrawtransaction", params)
if rpcErr != nil {
// Not an error; mempool transactions can disappear
continue
}
// strip the quotes
var txStr string
err = json.Unmarshal(result, &txStr)
if err != nil {
return err
}
txBytes, err := hex.DecodeString(txStr)
if err != nil {
return err
}
Log.Infoln("appending", txidstr)
newRtx := &walletrpc.RawTransaction{
Data: txBytes,
Height: uint64(g_lastBlockChainInfo.Blocks),
}
g_txList = append(g_txList, newRtx)
}
return nil
}
// StartMempoolMonitor starts monitoring the mempool
func StartMempoolMonitor(cache *BlockCache, done <-chan bool) {
go func() {
ticker := time.NewTicker(2 * time.Second)
blockcache = cache
lastHeight = blockcache.GetLatestHeight()
for {
select {
case <-ticker.C:
go func() {
//Log.Infoln("Ticker triggered")
err := refreshMempoolTxns()
if err != nil {
Log.Errorln("Mempool refresh error:", err.Error())
}
}()
case <-done:
for _, client := range clients {
close(client)
}
return
}
}
}()
func getLatestBlockChainInfo() (*ZcashdRpcReplyGetblockchaininfo, error) {
result, rpcErr := RawRequest("getblockchaininfo", []json.RawMessage{})
if rpcErr != nil {
return nil, rpcErr
}
var getblockchaininfoReply ZcashdRpcReplyGetblockchaininfo
err := json.Unmarshal(result, &getblockchaininfoReply)
if err != nil {
return nil, err
}
return &getblockchaininfoReply, nil
}

View File

@ -381,24 +381,10 @@ func (s *lwdStreamer) GetTaddressBalanceStream(addresses walletrpc.CompactTxStre
}
func (s *lwdStreamer) GetMempoolStream(_empty *walletrpc.Empty, resp walletrpc.CompactTxStreamer_GetMempoolStreamServer) error {
ch := make(chan *walletrpc.RawTransaction, 200)
go common.AddNewClient(ch)
for {
select {
case rtx, more := <-ch:
if !more || rtx == nil {
return nil
}
if resp.Send(rtx) != nil {
return nil
}
// Timeout after 5 mins
case <-time.After(5 * time.Minute):
return nil
}
}
err := common.GetMempool(func(tx *walletrpc.RawTransaction) error {
return resp.Send(tx)
})
return err
}
// Key is 32-byte txid (as a 64-character string), data is pointer to compact tx.

View File

@ -205,7 +205,8 @@ func (x *TxFilter) GetHash() []byte {
}
// RawTransaction contains the complete transaction data. It also optionally includes
// the block height in which the transaction was included.
// the block height in which the transaction was included, or, when returned
// by GetMempoolStream(), the latest block height.
type RawTransaction struct {
state protoimpl.MessageState
sizeCache protoimpl.SizeCache
@ -1303,7 +1304,7 @@ var file_service_proto_rawDesc = []byte{
0x0b, 0x32, 0x2b, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65,
0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64,
0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x52, 0x0c,
0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x32, 0xbb, 0x0a, 0x0a,
0x61, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x32, 0x98, 0x0b, 0x0a,
0x11, 0x43, 0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x54, 0x78, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x65, 0x72, 0x12, 0x54, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x4c, 0x61, 0x74, 0x65, 0x73, 0x74, 0x42,
0x6c, 0x6f, 0x63, 0x6b, 0x12, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61,
@ -1357,39 +1358,45 @@ var file_service_proto_rawDesc = []byte{
0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x45,
0x78, 0x63, 0x6c, 0x75, 0x64, 0x65, 0x1a, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e,
0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x43,
0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x54, 0x78, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0c,
0x47, 0x65, 0x74, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1e, 0x2e, 0x63,
0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b,
0x2e, 0x72, 0x70, 0x63, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x44, 0x1a, 0x20, 0x2e, 0x63,
0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b,
0x2e, 0x72, 0x70, 0x63, 0x2e, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x00,
0x12, 0x6f, 0x0a, 0x0f, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74,
0x78, 0x6f, 0x73, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c,
0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2f,
0x6f, 0x6d, 0x70, 0x61, 0x63, 0x74, 0x54, 0x78, 0x22, 0x00, 0x30, 0x01, 0x12, 0x5b, 0x0a, 0x10,
0x47, 0x65, 0x74, 0x4d, 0x65, 0x6d, 0x70, 0x6f, 0x6f, 0x6c, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d,
0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74,
0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x25,
0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73,
0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73,
0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x4c, 0x69, 0x73, 0x74, 0x22,
0x00, 0x12, 0x73, 0x0a, 0x15, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55,
0x74, 0x78, 0x6f, 0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73,
0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72,
0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78,
0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2b, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77,
0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65,
0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70,
0x6c, 0x79, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x4c, 0x69, 0x67,
0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a,
0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x52, 0x61, 0x77, 0x54, 0x72, 0x61, 0x6e, 0x73, 0x61,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x22, 0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0c, 0x47, 0x65, 0x74,
0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x12, 0x1e, 0x2e, 0x63, 0x61, 0x73, 0x68,
0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70,
0x63, 0x2e, 0x42, 0x6c, 0x6f, 0x63, 0x6b, 0x49, 0x44, 0x1a, 0x20, 0x2e, 0x63, 0x61, 0x73, 0x68,
0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70,
0x63, 0x2e, 0x54, 0x72, 0x65, 0x65, 0x53, 0x74, 0x61, 0x74, 0x65, 0x22, 0x00, 0x12, 0x6f, 0x0a,
0x0f, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73,
0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74,
0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72,
0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41, 0x72, 0x67, 0x1a, 0x2f, 0x2e, 0x63, 0x61,
0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e,
0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74,
0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x4c, 0x69, 0x73, 0x74, 0x22, 0x00, 0x12, 0x73,
0x0a, 0x15, 0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f,
0x73, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x29, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a,
0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e,
0x45, 0x6d, 0x70, 0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77,
0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69,
0x67, 0x68, 0x74, 0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x04, 0x50, 0x69,
0x6e, 0x67, 0x12, 0x1f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c,
0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74,
0x69, 0x6f, 0x6e, 0x1a, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c,
0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x50, 0x69, 0x6e, 0x67,
0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x1b, 0x5a, 0x16, 0x6c, 0x69,
0x67, 0x68, 0x74, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x64, 0x2f, 0x77, 0x61, 0x6c, 0x6c, 0x65,
0x74, 0x72, 0x70, 0x63, 0xba, 0x02, 0x00, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x47, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x41,
0x72, 0x67, 0x1a, 0x2b, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c,
0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x47, 0x65, 0x74, 0x41, 0x64,
0x64, 0x72, 0x65, 0x73, 0x73, 0x55, 0x74, 0x78, 0x6f, 0x73, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x22,
0x00, 0x30, 0x01, 0x12, 0x52, 0x0a, 0x0d, 0x47, 0x65, 0x74, 0x4c, 0x69, 0x67, 0x68, 0x74, 0x64,
0x49, 0x6e, 0x66, 0x6f, 0x12, 0x1c, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61,
0x6c, 0x6c, 0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x45, 0x6d, 0x70,
0x74, 0x79, 0x1a, 0x21, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c,
0x65, 0x74, 0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x4c, 0x69, 0x67, 0x68, 0x74,
0x64, 0x49, 0x6e, 0x66, 0x6f, 0x22, 0x00, 0x12, 0x4e, 0x0a, 0x04, 0x50, 0x69, 0x6e, 0x67, 0x12,
0x1f, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x2e,
0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x44, 0x75, 0x72, 0x61, 0x74, 0x69, 0x6f, 0x6e,
0x1a, 0x23, 0x2e, 0x63, 0x61, 0x73, 0x68, 0x2e, 0x7a, 0x2e, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74,
0x2e, 0x73, 0x64, 0x6b, 0x2e, 0x72, 0x70, 0x63, 0x2e, 0x50, 0x69, 0x6e, 0x67, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x1b, 0x5a, 0x16, 0x6c, 0x69, 0x67, 0x68, 0x74,
0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x64, 0x2f, 0x77, 0x61, 0x6c, 0x6c, 0x65, 0x74, 0x72, 0x70,
0x63, 0xba, 0x02, 0x00, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (
@ -1443,27 +1450,29 @@ var file_service_proto_depIdxs = []int32{
12, // 11: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:input_type -> cash.z.wallet.sdk.rpc.AddressList
11, // 12: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:input_type -> cash.z.wallet.sdk.rpc.Address
14, // 13: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:input_type -> cash.z.wallet.sdk.rpc.Exclude
0, // 14: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:input_type -> cash.z.wallet.sdk.rpc.BlockID
16, // 15: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
16, // 16: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
6, // 17: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:input_type -> cash.z.wallet.sdk.rpc.Empty
9, // 18: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:input_type -> cash.z.wallet.sdk.rpc.Duration
0, // 19: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLatestBlock:output_type -> cash.z.wallet.sdk.rpc.BlockID
19, // 20: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlock:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
19, // 21: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlockRange:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
3, // 22: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTransaction:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
4, // 23: cash.z.wallet.sdk.rpc.CompactTxStreamer.SendTransaction:output_type -> cash.z.wallet.sdk.rpc.SendResponse
3, // 24: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressTxids:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
13, // 25: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:output_type -> cash.z.wallet.sdk.rpc.Balance
13, // 26: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:output_type -> cash.z.wallet.sdk.rpc.Balance
20, // 27: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:output_type -> cash.z.wallet.sdk.rpc.CompactTx
15, // 28: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:output_type -> cash.z.wallet.sdk.rpc.TreeState
18, // 29: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReplyList
17, // 30: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReply
7, // 31: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:output_type -> cash.z.wallet.sdk.rpc.LightdInfo
10, // 32: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:output_type -> cash.z.wallet.sdk.rpc.PingResponse
19, // [19:33] is the sub-list for method output_type
5, // [5:19] is the sub-list for method input_type
6, // 14: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolStream:input_type -> cash.z.wallet.sdk.rpc.Empty
0, // 15: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:input_type -> cash.z.wallet.sdk.rpc.BlockID
16, // 16: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
16, // 17: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:input_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosArg
6, // 18: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:input_type -> cash.z.wallet.sdk.rpc.Empty
9, // 19: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:input_type -> cash.z.wallet.sdk.rpc.Duration
0, // 20: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLatestBlock:output_type -> cash.z.wallet.sdk.rpc.BlockID
19, // 21: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlock:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
19, // 22: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetBlockRange:output_type -> cash.z.wallet.sdk.rpc.CompactBlock
3, // 23: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTransaction:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
4, // 24: cash.z.wallet.sdk.rpc.CompactTxStreamer.SendTransaction:output_type -> cash.z.wallet.sdk.rpc.SendResponse
3, // 25: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressTxids:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
13, // 26: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalance:output_type -> cash.z.wallet.sdk.rpc.Balance
13, // 27: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTaddressBalanceStream:output_type -> cash.z.wallet.sdk.rpc.Balance
20, // 28: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolTx:output_type -> cash.z.wallet.sdk.rpc.CompactTx
3, // 29: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetMempoolStream:output_type -> cash.z.wallet.sdk.rpc.RawTransaction
15, // 30: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetTreeState:output_type -> cash.z.wallet.sdk.rpc.TreeState
18, // 31: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxos:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReplyList
17, // 32: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetAddressUtxosStream:output_type -> cash.z.wallet.sdk.rpc.GetAddressUtxosReply
7, // 33: cash.z.wallet.sdk.rpc.CompactTxStreamer.GetLightdInfo:output_type -> cash.z.wallet.sdk.rpc.LightdInfo
10, // 34: cash.z.wallet.sdk.rpc.CompactTxStreamer.Ping:output_type -> cash.z.wallet.sdk.rpc.PingResponse
20, // [20:35] is the sub-list for method output_type
5, // [5:20] is the sub-list for method input_type
5, // [5:5] is the sub-list for extension type_name
5, // [5:5] is the sub-list for extension extendee
0, // [0:5] is the sub-list for field type_name

View File

@ -32,7 +32,8 @@ message TxFilter {
}
// RawTransaction contains the complete transaction data. It also optionally includes
// the block height in which the transaction was included.
// the block height in which the transaction was included, or, when returned
// by GetMempoolStream(), the latest block height.
message RawTransaction {
bytes data = 1; // exact data returned by Zcash 'getrawtransaction'
uint64 height = 2; // height that the transaction was mined (or -1)

View File

@ -42,6 +42,9 @@ type CompactTxStreamerClient interface {
// match a shortened txid, they are all sent (none is excluded). Transactions
// in the exclude list that don't exist in the mempool are ignored.
GetMempoolTx(ctx context.Context, in *Exclude, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolTxClient, error)
// Return a stream of current Mempool transactions. This will keep the output stream open while
// there are mempool transactions. It will close the returned stream when a new block is mined.
GetMempoolStream(ctx context.Context, in *Empty, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolStreamClient, error)
// GetTreeState returns the note commitment tree state corresponding to the given block.
// See section 3.7 of the Zcash protocol specification. It returns several other useful
// values also (even though they can be obtained using GetBlock).
@ -238,6 +241,38 @@ func (x *compactTxStreamerGetMempoolTxClient) Recv() (*CompactTx, error) {
return m, nil
}
func (c *compactTxStreamerClient) GetMempoolStream(ctx context.Context, in *Empty, opts ...grpc.CallOption) (CompactTxStreamer_GetMempoolStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[4], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetMempoolStream", opts...)
if err != nil {
return nil, err
}
x := &compactTxStreamerGetMempoolStreamClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type CompactTxStreamer_GetMempoolStreamClient interface {
Recv() (*RawTransaction, error)
grpc.ClientStream
}
type compactTxStreamerGetMempoolStreamClient struct {
grpc.ClientStream
}
func (x *compactTxStreamerGetMempoolStreamClient) Recv() (*RawTransaction, error) {
m := new(RawTransaction)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *compactTxStreamerClient) GetTreeState(ctx context.Context, in *BlockID, opts ...grpc.CallOption) (*TreeState, error) {
out := new(TreeState)
err := c.cc.Invoke(ctx, "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetTreeState", in, out, opts...)
@ -257,7 +292,7 @@ func (c *compactTxStreamerClient) GetAddressUtxos(ctx context.Context, in *GetAd
}
func (c *compactTxStreamerClient) GetAddressUtxosStream(ctx context.Context, in *GetAddressUtxosArg, opts ...grpc.CallOption) (CompactTxStreamer_GetAddressUtxosStreamClient, error) {
stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[4], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetAddressUtxosStream", opts...)
stream, err := c.cc.NewStream(ctx, &CompactTxStreamer_ServiceDesc.Streams[5], "/cash.z.wallet.sdk.rpc.CompactTxStreamer/GetAddressUtxosStream", opts...)
if err != nil {
return nil, err
}
@ -334,6 +369,9 @@ type CompactTxStreamerServer interface {
// match a shortened txid, they are all sent (none is excluded). Transactions
// in the exclude list that don't exist in the mempool are ignored.
GetMempoolTx(*Exclude, CompactTxStreamer_GetMempoolTxServer) error
// Return a stream of current Mempool transactions. This will keep the output stream open while
// there are mempool transactions. It will close the returned stream when a new block is mined.
GetMempoolStream(*Empty, CompactTxStreamer_GetMempoolStreamServer) error
// GetTreeState returns the note commitment tree state corresponding to the given block.
// See section 3.7 of the Zcash protocol specification. It returns several other useful
// values also (even though they can be obtained using GetBlock).
@ -379,6 +417,9 @@ func (UnimplementedCompactTxStreamerServer) GetTaddressBalanceStream(CompactTxSt
func (UnimplementedCompactTxStreamerServer) GetMempoolTx(*Exclude, CompactTxStreamer_GetMempoolTxServer) error {
return status.Errorf(codes.Unimplemented, "method GetMempoolTx not implemented")
}
func (UnimplementedCompactTxStreamerServer) GetMempoolStream(*Empty, CompactTxStreamer_GetMempoolStreamServer) error {
return status.Errorf(codes.Unimplemented, "method GetMempoolStream not implemented")
}
func (UnimplementedCompactTxStreamerServer) GetTreeState(context.Context, *BlockID) (*TreeState, error) {
return nil, status.Errorf(codes.Unimplemented, "method GetTreeState not implemented")
}
@ -586,6 +627,27 @@ func (x *compactTxStreamerGetMempoolTxServer) Send(m *CompactTx) error {
return x.ServerStream.SendMsg(m)
}
func _CompactTxStreamer_GetMempoolStream_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(Empty)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(CompactTxStreamerServer).GetMempoolStream(m, &compactTxStreamerGetMempoolStreamServer{stream})
}
type CompactTxStreamer_GetMempoolStreamServer interface {
Send(*RawTransaction) error
grpc.ServerStream
}
type compactTxStreamerGetMempoolStreamServer struct {
grpc.ServerStream
}
func (x *compactTxStreamerGetMempoolStreamServer) Send(m *RawTransaction) error {
return x.ServerStream.SendMsg(m)
}
func _CompactTxStreamer_GetTreeState_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(BlockID)
if err := dec(in); err != nil {
@ -744,6 +806,11 @@ var CompactTxStreamer_ServiceDesc = grpc.ServiceDesc{
Handler: _CompactTxStreamer_GetMempoolTx_Handler,
ServerStreams: true,
},
{
StreamName: "GetMempoolStream",
Handler: _CompactTxStreamer_GetMempoolStream_Handler,
ServerStreams: true,
},
{
StreamName: "GetAddressUtxosStream",
Handler: _CompactTxStreamer_GetAddressUtxosStream_Handler,