2021-12-02 16:02:32 -08:00
package spy
import (
2022-11-09 08:39:57 -08:00
"bytes"
2021-12-02 16:02:32 -08:00
"context"
"fmt"
2022-06-21 12:18:16 -07:00
"net"
"net/http"
"os"
"sync"
2021-12-02 16:02:32 -08:00
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
2022-06-21 12:18:16 -07:00
spyv1 "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
2021-12-02 16:02:32 -08:00
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/google/uuid"
"github.com/gorilla/mux"
ipfslog "github.com/ipfs/go-log/v2"
2022-09-05 20:36:58 -07:00
"github.com/libp2p/go-libp2p/core/crypto"
2021-12-02 16:02:32 -08:00
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
2022-08-18 01:52:36 -07:00
"github.com/wormhole-foundation/wormhole/sdk/vaa"
2021-12-02 16:02:32 -08:00
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
rootCtx context . Context
rootCtxCancel context . CancelFunc
)
var (
p2pNetworkID * string
p2pPort * uint
p2pBootstrap * string
statusAddr * string
nodeKeyPath * string
logLevel * string
spyRPC * string
)
func init ( ) {
p2pNetworkID = SpyCmd . Flags ( ) . String ( "network" , "/wormhole/dev" , "P2P network identifier" )
p2pPort = SpyCmd . Flags ( ) . Uint ( "port" , 8999 , "P2P UDP listener port" )
p2pBootstrap = SpyCmd . Flags ( ) . String ( "bootstrap" , "" , "P2P bootstrap peers (comma-separated)" )
statusAddr = SpyCmd . Flags ( ) . String ( "statusAddr" , "[::]:6060" , "Listen address for status server (disabled if blank)" )
nodeKeyPath = SpyCmd . Flags ( ) . String ( "nodeKey" , "" , "Path to node key (will be generated if it doesn't exist)" )
logLevel = SpyCmd . Flags ( ) . String ( "logLevel" , "info" , "Logging level (debug, info, warn, error, dpanic, panic, fatal)" )
spyRPC = SpyCmd . Flags ( ) . String ( "spyRPC" , "" , "Listen address for gRPC interface" )
}
// SpyCmd represents the node command
var SpyCmd = & cobra . Command {
Use : "spy" ,
Short : "Run gossip spy client" ,
Run : runSpy ,
}
type spyServer struct {
spyv1 . UnimplementedSpyRPCServiceServer
2022-11-09 08:39:57 -08:00
logger * zap . Logger
subsSignedVaa map [ string ] * subscriptionSignedVaa
subsSignedVaaMu sync . Mutex
subsAllVaa map [ string ] * subscriptionAllVaa
subsAllVaaMu sync . Mutex
2021-12-02 16:02:32 -08:00
}
type message struct {
vaaBytes [ ] byte
}
2022-11-09 08:39:57 -08:00
type filterSignedVaa struct {
2021-12-02 16:02:32 -08:00
chainId vaa . ChainID
emitterAddr vaa . Address
}
2022-11-09 08:39:57 -08:00
type subscriptionSignedVaa struct {
filters [ ] filterSignedVaa
2021-12-02 16:02:32 -08:00
ch chan message
}
2022-11-09 08:39:57 -08:00
type subscriptionAllVaa struct {
filters [ ] * spyv1 . FilterEntry
ch chan * spyv1 . SubscribeSignedVAAByTypeResponse
}
2021-12-02 16:02:32 -08:00
func subscriptionId ( ) string {
return uuid . New ( ) . String ( )
}
2022-11-09 08:39:57 -08:00
func ( s * spyServer ) PublishSignedVAA ( vaaBytes [ ] byte ) error {
s . subsSignedVaaMu . Lock ( )
defer s . subsSignedVaaMu . Unlock ( )
var v * vaa . VAA
for _ , sub := range s . subsSignedVaa {
if len ( sub . filters ) == 0 {
sub . ch <- message { vaaBytes : vaaBytes }
continue
}
if v == nil {
var err error
v , err = vaa . Unmarshal ( vaaBytes )
if err != nil {
return err
}
}
for _ , fi := range sub . filters {
if fi . chainId == v . EmitterChain && fi . emitterAddr == v . EmitterAddress {
sub . ch <- message { vaaBytes : vaaBytes }
}
}
2021-12-02 16:02:32 -08:00
}
2022-11-09 08:39:57 -08:00
return nil
}
// TransactionIdMatches checks if both TxIds have the same value.
func TransactionIdMatches ( g * gossipv1 . SignedBatchVAAWithQuorum , t * spyv1 . BatchFilter ) bool {
return bytes . Equal ( g . TxId , t . TxId )
}
// BatchMatchFilter asserts that the obervation matches the values of the filter.
func BatchMatchesFilter ( g * gossipv1 . SignedBatchVAAWithQuorum , f * spyv1 . BatchFilter ) bool {
// check the chain ID
if g . ChainId != uint32 ( f . ChainId ) {
return false
}
// check the transaction ID
txMatch := TransactionIdMatches ( g , f )
if ! txMatch {
return false
2021-12-02 16:02:32 -08:00
}
2022-11-09 08:39:57 -08:00
// check the Nonce
if f . Nonce >= 1 {
// filter has a nonce, so make sure it matches
if g . Nonce != f . Nonce {
// filter's nonce does not match the nonce of the Batch.
return false
}
}
2021-12-02 16:02:32 -08:00
2022-11-09 08:39:57 -08:00
return true
2021-12-02 16:02:32 -08:00
}
2022-11-09 08:39:57 -08:00
// HandleGossipVAA compares a gossip message to client subscriptions & filters,
// and forwards the VAA to those requesting it.
func ( s * spyServer ) HandleGossipVAA ( g * gossipv1 . SignedVAAWithQuorum ) error {
s . subsAllVaaMu . Lock ( )
defer s . subsAllVaaMu . Unlock ( )
2021-12-02 16:02:32 -08:00
2022-11-09 08:39:57 -08:00
v , err := vaa . Unmarshal ( g . Vaa )
if err != nil {
s . logger . Error ( "failed unmarshaing VAA bytes from gossipv1.SignedVAAWithQuorum." ,
zap . Error ( err ) )
return err
}
// resType defines which oneof proto will be retuned - res type "SignedVaa" is *gossipv1.SignedVAAWithQuorum
resType := & spyv1 . SubscribeSignedVAAByTypeResponse_SignedVaa {
SignedVaa : g ,
}
// envelope is the highest level proto struct, the wrapper proto that contains one of the VAA types.
envelope := & spyv1 . SubscribeSignedVAAByTypeResponse {
VaaType : resType ,
}
2021-12-02 16:02:32 -08:00
2022-11-09 08:39:57 -08:00
// loop through the subscriptions and send responses to everyone that wants this VAA
for _ , sub := range s . subsAllVaa {
2021-12-02 16:02:32 -08:00
if len ( sub . filters ) == 0 {
2022-11-09 08:39:57 -08:00
// this subscription has no filters, send them the VAA.
sub . ch <- envelope
continue
}
// this subscription has filters.
for _ , filterEntry := range sub . filters {
filter := filterEntry . GetFilter ( )
switch t := filter . ( type ) {
case * spyv1 . FilterEntry_EmitterFilter :
filterAddr := t . EmitterFilter . EmitterAddress
filterChain := vaa . ChainID ( t . EmitterFilter . ChainId )
if v . EmitterChain == filterChain && v . EmitterAddress . String ( ) == filterAddr {
// it is a match, send the response
sub . ch <- envelope
2021-12-02 16:02:32 -08:00
}
2022-11-09 08:39:57 -08:00
default :
panic ( fmt . Sprintf ( "unsupported filter type in subscriptions: %T" , filter ) )
2021-12-02 16:02:32 -08:00
}
2022-11-09 08:39:57 -08:00
}
}
return nil
}
// HandleGossipBatchVAA compares a gossip message to client subscriptions & filters,
// and forwards the VAA to those requesting it.
func ( s * spyServer ) HandleGossipBatchVAA ( g * gossipv1 . SignedBatchVAAWithQuorum ) error {
s . subsAllVaaMu . Lock ( )
defer s . subsAllVaaMu . Unlock ( )
b , err := vaa . UnmarshalBatch ( g . BatchVaa )
if err != nil {
s . logger . Error ( "failed unmarshaing BatchVAA bytes from gossipv1.SignedBatchVAAWithQuorum." ,
zap . Error ( err ) )
return err
}
// resType defines which oneof proto will be retuned -
// res type "SignedBatchVaa" is *gossipv1.SignedBatchVAAWithQuorum
resType := & spyv1 . SubscribeSignedVAAByTypeResponse_SignedBatchVaa {
SignedBatchVaa : g ,
}
2021-12-02 16:02:32 -08:00
2022-11-09 08:39:57 -08:00
// envelope is the highest level proto struct, the wrapper proto that contains one of the VAA types.
envelope := & spyv1 . SubscribeSignedVAAByTypeResponse {
VaaType : resType ,
}
// loop through the subscriptions and send responses to everyone that wants this VAA
for _ , sub := range s . subsAllVaa {
if len ( sub . filters ) == 0 {
// this subscription has no filters, send them the VAA.
sub . ch <- envelope
continue
}
// this subscription has filters.
for _ , filterEntry := range sub . filters {
filter := filterEntry . GetFilter ( )
switch t := filter . ( type ) {
case * spyv1 . FilterEntry_EmitterFilter :
filterChain := uint32 ( t . EmitterFilter . ChainId )
if g . ChainId != filterChain {
// VAA does not pass the filter
continue
}
// BatchVAAs do not have EmitterAddress at the top level - each Observation
// in the Batch has an EmitterAddress.
// In order to make it easier for integrators, allow subscribing to BatchVAAs by
// EmitterFilter. Send BatchVAAs to subscriptions with an EmitterFilter that
// matches 1 (or more) Obervation(s) in the batch.
filterAddr := t . EmitterFilter . EmitterAddress
// check each Observation to see if it meets the criteria of the filter.
for _ , obs := range b . Observations {
if obs . Observation . EmitterAddress . String ( ) == filterAddr {
// it is a match, send the response to the subscriber.
sub . ch <- envelope
break
}
}
case * spyv1 . FilterEntry_BatchFilter :
if BatchMatchesFilter ( g , t . BatchFilter ) {
sub . ch <- envelope
2021-12-02 16:02:32 -08:00
}
2022-11-09 08:39:57 -08:00
case * spyv1 . FilterEntry_BatchTransactionFilter :
// make a BatchFilter struct from the BatchTransactionFilter since the latter is
// a subset of the former's properties, so we can use TransactionIdMatches.
batchFilter := & spyv1 . BatchFilter {
ChainId : t . BatchTransactionFilter . ChainId ,
TxId : t . BatchTransactionFilter . TxId ,
}
if BatchMatchesFilter ( g , batchFilter ) {
sub . ch <- envelope
}
default :
panic ( fmt . Sprintf ( "unsupported filter type in subscriptions: %T" , filter ) )
2021-12-02 16:02:32 -08:00
}
}
}
return nil
}
func ( s * spyServer ) SubscribeSignedVAA ( req * spyv1 . SubscribeSignedVAARequest , resp spyv1 . SpyRPCService_SubscribeSignedVAAServer ) error {
2022-11-09 08:39:57 -08:00
var fi [ ] filterSignedVaa
2021-12-02 16:02:32 -08:00
if req . Filters != nil {
for _ , f := range req . Filters {
switch t := f . Filter . ( type ) {
case * spyv1 . FilterEntry_EmitterFilter :
2022-11-09 08:39:57 -08:00
addr , err := vaa . StringToAddress ( t . EmitterFilter . EmitterAddress )
2021-12-02 16:02:32 -08:00
if err != nil {
return status . Error ( codes . InvalidArgument , fmt . Sprintf ( "failed to decode emitter address: %v" , err ) )
}
2022-11-09 08:39:57 -08:00
fi = append ( fi , filterSignedVaa {
2021-12-02 16:02:32 -08:00
chainId : vaa . ChainID ( t . EmitterFilter . ChainId ) ,
emitterAddr : addr ,
} )
default :
return status . Error ( codes . InvalidArgument , "unsupported filter type" )
}
}
}
2022-11-09 08:39:57 -08:00
s . subsSignedVaaMu . Lock ( )
2021-12-02 16:02:32 -08:00
id := subscriptionId ( )
2022-11-09 08:39:57 -08:00
sub := & subscriptionSignedVaa {
2021-12-02 16:02:32 -08:00
ch : make ( chan message , 1 ) ,
filters : fi ,
}
2022-11-09 08:39:57 -08:00
s . subsSignedVaa [ id ] = sub
s . subsSignedVaaMu . Unlock ( )
2021-12-02 16:02:32 -08:00
defer func ( ) {
2022-11-09 08:39:57 -08:00
s . subsSignedVaaMu . Lock ( )
defer s . subsSignedVaaMu . Unlock ( )
delete ( s . subsSignedVaa , id )
2021-12-02 16:02:32 -08:00
} ( )
for {
select {
case <- resp . Context ( ) . Done ( ) :
return resp . Context ( ) . Err ( )
case msg := <- sub . ch :
if err := resp . Send ( & spyv1 . SubscribeSignedVAAResponse {
VaaBytes : msg . vaaBytes ,
} ) ; err != nil {
return err
}
}
}
}
2022-11-09 08:39:57 -08:00
// SubscribeSignedVAAByType fields requests for subscriptions. Each new subscription adds a channel and request params (filters)
// to the map of active subscriptions.
func ( s * spyServer ) SubscribeSignedVAAByType ( req * spyv1 . SubscribeSignedVAAByTypeRequest , resp spyv1 . SpyRPCService_SubscribeSignedVAAByTypeServer ) error {
var fi [ ] * spyv1 . FilterEntry
if req . Filters != nil {
for _ , f := range req . Filters {
switch t := f . Filter . ( type ) {
case * spyv1 . FilterEntry_EmitterFilter :
// validate the emitter address is valid by decoding it
_ , err := vaa . StringToAddress ( t . EmitterFilter . EmitterAddress )
if err != nil {
return status . Error ( codes . InvalidArgument , fmt . Sprintf ( "failed to decode emitter address: %v" , err ) )
}
fi = append ( fi , & spyv1 . FilterEntry { Filter : t } )
case * spyv1 . FilterEntry_BatchFilter ,
* spyv1 . FilterEntry_BatchTransactionFilter :
fi = append ( fi , & spyv1 . FilterEntry { Filter : t } )
default :
return status . Error ( codes . InvalidArgument , "unsupported filter type" )
}
}
}
s . subsAllVaaMu . Lock ( )
id := subscriptionId ( )
sub := & subscriptionAllVaa {
ch : make ( chan * spyv1 . SubscribeSignedVAAByTypeResponse , 1 ) ,
filters : fi ,
}
s . subsAllVaa [ id ] = sub
s . subsAllVaaMu . Unlock ( )
defer func ( ) {
s . subsAllVaaMu . Lock ( )
defer s . subsAllVaaMu . Unlock ( )
delete ( s . subsAllVaa , id )
} ( )
for {
select {
case <- resp . Context ( ) . Done ( ) :
return resp . Context ( ) . Err ( )
case msg := <- sub . ch :
if err := resp . Send ( msg ) ; err != nil {
return err
}
}
}
}
2021-12-02 16:02:32 -08:00
func newSpyServer ( logger * zap . Logger ) * spyServer {
return & spyServer {
2022-11-09 08:39:57 -08:00
logger : logger . Named ( "spyserver" ) ,
subsSignedVaa : make ( map [ string ] * subscriptionSignedVaa ) ,
subsAllVaa : make ( map [ string ] * subscriptionAllVaa ) ,
2021-12-02 16:02:32 -08:00
}
}
func spyServerRunnable ( s * spyServer , logger * zap . Logger , listenAddr string ) ( supervisor . Runnable , * grpc . Server , error ) {
l , err := net . Listen ( "tcp" , listenAddr )
if err != nil {
return nil , nil , fmt . Errorf ( "failed to listen: %w" , err )
}
2022-11-09 08:39:57 -08:00
logger . Info ( "spy server listening" , zap . String ( "addr" , l . Addr ( ) . String ( ) ) )
2021-12-02 16:02:32 -08:00
grpcServer := common . NewInstrumentedGRPCServer ( logger )
spyv1 . RegisterSpyRPCServiceServer ( grpcServer , s )
return supervisor . GRPCServer ( grpcServer , l , false ) , grpcServer , nil
}
func runSpy ( cmd * cobra . Command , args [ ] string ) {
common . SetRestrictiveUmask ( )
lvl , err := ipfslog . LevelFromString ( * logLevel )
if err != nil {
fmt . Println ( "Invalid log level" )
os . Exit ( 1 )
}
logger := ipfslog . Logger ( "wormhole-spy" ) . Desugar ( )
ipfslog . SetAllLoggers ( lvl )
// Status server
if * statusAddr != "" {
router := mux . NewRouter ( )
router . Handle ( "/metrics" , promhttp . Handler ( ) )
go func ( ) {
logger . Info ( "status server listening on [::]:6060" )
2022-11-28 05:48:27 -08:00
logger . Error ( "status server crashed" , zap . Error ( http . ListenAndServe ( * statusAddr , router ) ) ) // #nosec G114 local status server not vulnerable to DoS attack
2021-12-02 16:02:32 -08:00
} ( )
}
// Verify flags
if * nodeKeyPath == "" {
logger . Fatal ( "Please specify --nodeKey" )
}
if * p2pBootstrap == "" {
logger . Fatal ( "Please specify --bootstrap" )
}
// Node's main lifecycle context.
rootCtx , rootCtxCancel = context . WithCancel ( context . Background ( ) )
defer rootCtxCancel ( )
// Outbound gossip message queue
sendC := make ( chan [ ] byte )
// Inbound observations
obsvC := make ( chan * gossipv1 . SignedObservation , 50 )
2022-09-15 12:48:57 -07:00
// Inbound observation requests
obsvReqC := make ( chan * gossipv1 . ObservationRequest , 50 )
2021-12-02 16:02:32 -08:00
// Inbound signed VAAs
signedInC := make ( chan * gossipv1 . SignedVAAWithQuorum , 50 )
// Guardian set state managed by processor
2022-09-06 16:36:02 -07:00
gst := common . NewGuardianSetState ( nil )
2021-12-02 16:02:32 -08:00
// RPC server
s := newSpyServer ( logger )
rpcSvc , _ , err := spyServerRunnable ( s , logger , * spyRPC )
if err != nil {
logger . Fatal ( "failed to start RPC server" , zap . Error ( err ) )
}
// Ignore observations
go func ( ) {
for {
select {
case <- rootCtx . Done ( ) :
return
case <- obsvC :
}
}
} ( )
2022-09-15 12:48:57 -07:00
// Ignore observation requests
// Note: without this, the whole program hangs on observation requests
go func ( ) {
for {
select {
case <- rootCtx . Done ( ) :
return
case <- obsvReqC :
}
}
} ( )
2021-12-02 16:02:32 -08:00
// Log signed VAAs
go func ( ) {
for {
select {
case <- rootCtx . Done ( ) :
return
case v := <- signedInC :
logger . Info ( "Received signed VAA" ,
zap . Any ( "vaa" , v . Vaa ) )
2022-11-09 08:39:57 -08:00
if err := s . PublishSignedVAA ( v . Vaa ) ; err != nil {
2021-12-02 16:02:32 -08:00
logger . Error ( "failed to publish signed VAA" , zap . Error ( err ) )
}
2022-11-09 08:39:57 -08:00
if err := s . HandleGossipVAA ( v ) ; err != nil {
logger . Error ( "failed to HandleGossipVAA" , zap . Error ( err ) )
}
2021-12-02 16:02:32 -08:00
}
}
} ( )
// Load p2p private key
var priv crypto . PrivKey
priv , err = common . GetOrCreateNodeKey ( logger , * nodeKeyPath )
if err != nil {
logger . Fatal ( "Failed to load node key" , zap . Error ( err ) )
}
// Run supervisor.
supervisor . New ( rootCtx , logger , func ( ctx context . Context ) error {
2023-01-16 04:33:01 -08:00
if err := supervisor . Run ( ctx , "p2p" , p2p . Run ( obsvC , obsvReqC , nil , sendC , signedInC , priv , nil , gst , * p2pPort , * p2pNetworkID , * p2pBootstrap , "" , false , rootCtxCancel , nil , nil , nil , nil ) ) ; err != nil {
2021-12-02 16:02:32 -08:00
return err
}
if err := supervisor . Run ( ctx , "spyrpc" , rpcSvc ) ; err != nil {
return err
}
logger . Info ( "Started internal services" )
<- ctx . Done ( )
return nil
} ,
// It's safer to crash and restart the process in case we encounter a panic,
// rather than attempting to reschedule the runnable.
supervisor . WithPropagatePanic )
<- rootCtx . Done ( )
logger . Info ( "root context cancelled, exiting..." )
// TODO: wait for things to shut down gracefully
}