Updated golang client (#58)

* Updated golang client

Golang client now works with the bidi stream and provide similar command line arugments to the Rust client.

* Added keepalive params

* Make the golang args standardized with the rust lcient

See #44.

* Make endpoint parsing follow our standard format

Parses endpoint as URL instead of host:port combo.
This commit is contained in:
Linus Kendall 2023-02-22 17:27:48 +05:30 committed by GitHub
parent 15640975c7
commit adc60a9c99
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 2754 additions and 336 deletions

View File

@ -1,2 +1,2 @@
protoc:
protoc --go_out=./proto --go_opt=paths=source_relative --go-grpc_out=./proto --go-grpc_opt=paths=source_relative --proto_path ../proto/ geyser.proto
protoc --go_out=./proto --go_opt=paths=source_relative --go-grpc_out=./proto --go-grpc_opt=paths=source_relative --proto_path ../proto/ ../proto/*.proto

View File

@ -0,0 +1,21 @@
# Golang client for Solana gRPC interface
This is a sample golang client for the Solana gRPC interface.
Requires golang 1.17
Sample usage:
```
go1.17 run ./cmd/grpc-client/ -grpc api.rpcpool.com:443 -token <token> -slots
```
You can also make non SSL connections:
```
go1.17 run ./cmd/grpc-client/ -insecure -grpc api.rpcpool.com:80 -token <token> -blocks
````
## Updating protofiles
You can run `make` to update the protofiles.

View File

@ -0,0 +1,12 @@
package main
type arrayFlags []string
func (i *arrayFlags) String() string {
return "string representation of flag"
}
func (i *arrayFlags) Set(value string) error {
*i = append(*i, value)
return nil
}

View File

@ -2,30 +2,43 @@ package main
import (
"context"
"crypto/md5"
"encoding/base64"
"crypto/x509"
"encoding/json"
"flag"
"io"
"log"
"sync"
"net/url"
"os"
"time"
"github.com/gagliardetto/solana-go"
"github.com/gagliardetto/solana-go/rpc"
pb "github.com/solana-geyser-grpc/golang/proto"
pb "github.com/rpcpool/solana-geyser-grpc/golang/proto"
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
)
var (
grpcAddr = flag.String("grpc", "", "Solana gRPC address")
account = flag.String("account", "", "Account to subscribe to")
token = flag.String("token", "", "set token")
accountData = flag.Bool("account-data", true, "Include data")
grpcAddr = flag.String("endpoint", "", "Solana gRPC address, in URI format e.g. https://api.rpcpool.com")
token = flag.String("x-token", "", "Token for authenticating")
jsonInput = flag.String("json", "", "JSON for subscription request, prefix with @ to read json from file")
insecureConnection = flag.Bool("insecure", false, "Connect without TLS")
slots = flag.Bool("slots", false, "Subscribe to slots update")
blocks = flag.Bool("blocks", false, "Subscribe to block update")
block_meta = flag.Bool("blocks-meta", false, "Subscribe to block metadata update")
signature = flag.String("signature", "", "Subscribe to a specific transaction signature")
grpc_slot map[uint64]uint = make(map[uint64]uint)
accounts = flag.Bool("accounts", false, "Subscribe to accounts")
transactions = flag.Bool("transactions", false, "Subscribe to transactions, required for tx_account_include/tx_account_exclude and vote/failed.")
voteTransactions = flag.Bool("transactions-vote", false, "Include vote transactions")
failedTransactions = flag.Bool("transactions-failed", false, "Include failed transactions")
accountsFilter arrayFlags
accountOwnersFilter arrayFlags
transactionsAccountsInclude arrayFlags
transactionsAccountsExclude arrayFlags
)
var kacp = keepalive.ClientParameters{
@ -34,38 +47,193 @@ var kacp = keepalive.ClientParameters{
PermitWithoutStream: true, // send pings even without active streams
}
func main() {
log.SetFlags(0)
flag.Var(&accountsFilter, "accounts-account", "Subscribe to an account, may be specified multiple times to subscribe to multiple accounts.")
flag.Var(&accountOwnersFilter, "accounts-owner", "Subscribe to an account owner, may be specified multiple times to subscribe to multiple account owners.")
flag.Var(&transactionsAccountsInclude, "transactions-account-include", "Subscribe to transactions mentioning an account, may be specified multiple times to subscribe to multiple accounts.")
flag.Var(&transactionsAccountsExclude, "transactions-account-exclude", "Subscribe to transactions not mentioning an account, may be specified multiple times to exclude multiple accounts.")
flag.Parse()
grpc_client()
}
func grpc_client() {
log.Println("Starting grpc client")
conn, err := grpc.Dial(*grpcAddr, grpc.WithTransportCredentials(insecure.NewCredentials()), grpc.WithKeepaliveParams(kacp))
if err != nil {
panic(err)
if *grpcAddr == "" {
log.Fatalf("GRPC address is required. Please provide --endpoint parameter.")
}
u, err := url.Parse(*grpcAddr)
if err != nil {
log.Fatalf("Invalid GRPC address provided: %v", err)
}
// Infer insecure connection if http is given
if u.Scheme == "http" {
*insecureConnection = true
}
port := u.Port()
if port == "" {
if *insecureConnection {
port = "80"
} else {
port = "443"
}
}
hostname := u.Hostname()
if hostname == "" {
log.Fatalf("Please provide URL format endpoint e.g. http(s)://<endpoint>:<port>")
}
address := hostname + ":" + port
conn := grpc_connect(address, *insecureConnection)
defer conn.Close()
grpc_subscribe(conn)
}
func grpc_connect(address string, plaintext bool) *grpc.ClientConn {
var opts []grpc.DialOption
if plaintext {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
} else {
pool, _ := x509.SystemCertPool()
creds := credentials.NewClientTLSFromCert(pool, "")
opts = append(opts, grpc.WithTransportCredentials(creds))
}
opts = append(opts, grpc.WithKeepaliveParams(kacp))
log.Println("Starting grpc client, connecting to", address)
conn, err := grpc.Dial(address, opts...)
if err != nil {
log.Fatalf("fail to dial: %v", err)
}
return conn
}
func grpc_subscribe(conn *grpc.ClientConn) {
var err error
client := pb.NewGeyserClient(conn)
subr2 := pb.SubscribeRequest{}
subr2.Accounts = make(map[string]*pb.SubscribeRequestFilterAccounts)
subr2.Accounts["subscription"] = &pb.SubscribeRequestFilterAccounts{Account: []string{*account}}
var subscription pb.SubscribeRequest
ctx := context.Background()
if *token != "" {
md := metadata.New(map[string]string{"x-token": *token})
ctx = metadata.NewOutgoingContext(ctx, md)
}
// Read json input or JSON file prefixed with @
if *jsonInput != "" {
var jsonData []byte
stream, err := client.Subscribe(ctx, &subr2)
if err != nil {
panic(err)
if (*jsonInput)[0] == '@' {
jsonData, err = os.ReadFile((*jsonInput)[1:])
if err != nil {
log.Fatalf("Error reading provided json file: %v", err)
}
} else {
jsonData = []byte(*jsonInput)
}
err := json.Unmarshal(jsonData, &subscription)
if err != nil {
log.Fatalf("Error parsing JSON: %v", err)
}
} else {
// If no JSON provided, start with blank
subscription = pb.SubscribeRequest{}
}
// We append to the JSON provided maps. If JSON provides a map item
// with the exact same ID, then this will override that sub.
if *slots {
if subscription.Slots == nil {
subscription.Slots = make(map[string]*pb.SubscribeRequestFilterSlots)
}
subscription.Slots["slots"] = &pb.SubscribeRequestFilterSlots{}
}
if *blocks {
if subscription.Blocks == nil {
subscription.Blocks = make(map[string]*pb.SubscribeRequestFilterBlocks)
}
subscription.Blocks["blocks"] = &pb.SubscribeRequestFilterBlocks{}
}
if *block_meta {
if subscription.BlocksMeta == nil {
subscription.BlocksMeta = make(map[string]*pb.SubscribeRequestFilterBlocksMeta)
}
subscription.BlocksMeta["block_meta"] = &pb.SubscribeRequestFilterBlocksMeta{}
}
if (len(accountsFilter)+len(accountOwnersFilter)) > 0 || (*accounts) {
if subscription.Accounts == nil {
subscription.Accounts = make(map[string]*pb.SubscribeRequestFilterAccounts)
}
subscription.Accounts["account_sub"] = &pb.SubscribeRequestFilterAccounts{}
if len(accountsFilter) > 0 {
subscription.Accounts["account_sub"].Account = accountsFilter
}
if len(accountOwnersFilter) > 0 {
subscription.Accounts["account_sub"].Owner = accountOwnersFilter
}
}
// Set up the transactions subscription
if subscription.Transactions == nil {
subscription.Transactions = make(map[string]*pb.SubscribeRequestFilterTransactions)
}
// Subscribe to a specific signature
if *signature != "" {
tr := true
subscription.Transactions["signature_sub"] = &pb.SubscribeRequestFilterTransactions{
Failed: &tr,
Vote: &tr,
}
if *signature != "" {
subscription.Transactions["signature_sub"].Signature = signature
}
}
// Subscribe to generic transaction stream
if *transactions {
subscription.Transactions["transactions_sub"] = &pb.SubscribeRequestFilterTransactions{
Failed: failedTransactions,
Vote: voteTransactions,
}
subscription.Transactions["transactions_sub"].AccountInclude = transactionsAccountsInclude
subscription.Transactions["transactions_sub"].AccountExclude = transactionsAccountsExclude
}
subscriptionJson, err := json.Marshal(&subscription)
if err != nil {
log.Printf("Failed to marshal subscription request: %v", subscriptionJson)
}
log.Printf("Subscription request: %s", string(subscriptionJson))
// Set up the subscription request
ctx := context.Background()
if *token != "" {
md := metadata.New(map[string]string{"x-token": *token})
ctx = metadata.NewOutgoingContext(ctx, md)
}
stream, err := client.Subscribe(ctx)
if err != nil {
log.Fatalf("%v", err)
}
err = stream.Send(&subscription)
if err != nil {
log.Fatalf("%v", err)
}
stream.CloseSend()
for {
resp, err := stream.Recv()
timestamp := time.Now().UnixNano()
@ -74,18 +242,9 @@ func grpc_client() {
return
}
if err != nil {
panic(err)
log.Fatalf("Error occurred in receiving update: %v", err)
}
account := resp.GetAccount()
grpc_slot[account.Slot]++
if *accountData {
log.Printf("[GRPC] %d{%d}: %s @ %d", account.Slot, grpc_slot[account.Slot], base64.StdEncoding.EncodeToString(account.Account.Data), timestamp)
} else {
log.Printf("[GRPC] %d{%d}: %x @ %d", account.Slot, grpc_slot[account.Slot], md5.Sum(account.Account.Data), timestamp)
}
log.Printf("%v %v", timestamp, resp)
}
}

View File

@ -1,9 +1,16 @@
module github.com/rpcpool/solana-geyser-grpc/golang
go 1.16
go 1.17
require (
github.com/gagliardetto/solana-go v1.7.1
google.golang.org/grpc v1.50.1
golang.org/x/net v0.5.0 // indirect
google.golang.org/genproto v0.0.0-20230202175211-008b39050e57 // indirect
google.golang.org/grpc v1.52.3
google.golang.org/protobuf v1.28.1
)
require (
github.com/golang/protobuf v1.5.2 // indirect
golang.org/x/sys v0.4.0 // indirect
golang.org/x/text v0.6.0 // indirect
)

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff