wormhole/node/cmd/spy/spy.go

323 lines
7.5 KiB
Go
Raw Normal View History

package spy
import (
"context"
"encoding/hex"
"fmt"
"net"
"net/http"
"os"
"sync"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
spyv1 "github.com/certusone/wormhole/node/pkg/proto/spy/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/certusone/wormhole/node/pkg/vaa"
"github.com/google/uuid"
"github.com/gorilla/mux"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/spf13/cobra"
"go.uber.org/zap"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
var (
rootCtx context.Context
rootCtxCancel context.CancelFunc
)
var (
p2pNetworkID *string
p2pPort *uint
p2pBootstrap *string
statusAddr *string
nodeKeyPath *string
logLevel *string
spyRPC *string
)
func init() {
p2pNetworkID = SpyCmd.Flags().String("network", "/wormhole/dev", "P2P network identifier")
p2pPort = SpyCmd.Flags().Uint("port", 8999, "P2P UDP listener port")
p2pBootstrap = SpyCmd.Flags().String("bootstrap", "", "P2P bootstrap peers (comma-separated)")
statusAddr = SpyCmd.Flags().String("statusAddr", "[::]:6060", "Listen address for status server (disabled if blank)")
nodeKeyPath = SpyCmd.Flags().String("nodeKey", "", "Path to node key (will be generated if it doesn't exist)")
logLevel = SpyCmd.Flags().String("logLevel", "info", "Logging level (debug, info, warn, error, dpanic, panic, fatal)")
spyRPC = SpyCmd.Flags().String("spyRPC", "", "Listen address for gRPC interface")
}
// SpyCmd represents the node command
var SpyCmd = &cobra.Command{
Use: "spy",
Short: "Run gossip spy client",
Run: runSpy,
}
type spyServer struct {
spyv1.UnimplementedSpyRPCServiceServer
logger *zap.Logger
subs map[string]*subscription
subsMu sync.Mutex
}
type message struct {
vaaBytes []byte
}
type filter struct {
chainId vaa.ChainID
emitterAddr vaa.Address
}
type subscription struct {
filters []filter
ch chan message
}
func subscriptionId() string {
return uuid.New().String()
}
func decodeEmitterAddr(hexAddr string) (vaa.Address, error) {
address, err := hex.DecodeString(hexAddr)
if err != nil {
return vaa.Address{}, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err))
}
if len(address) != 32 {
return vaa.Address{}, status.Error(codes.InvalidArgument, "address must be 32 bytes")
}
addr := vaa.Address{}
copy(addr[:], address)
return addr, nil
}
func (s *spyServer) Publish(vaaBytes []byte) error {
s.subsMu.Lock()
defer s.subsMu.Unlock()
var v *vaa.VAA
for _, sub := range s.subs {
if len(sub.filters) == 0 {
sub.ch <- message{vaaBytes: vaaBytes}
} else {
if v == nil {
var err error
v, err = vaa.Unmarshal(vaaBytes)
if err != nil {
return err
}
}
for _, fi := range sub.filters {
if fi.chainId == v.EmitterChain && fi.emitterAddr == v.EmitterAddress {
sub.ch <- message{vaaBytes: vaaBytes}
}
}
}
}
return nil
}
func (s *spyServer) SubscribeSignedVAA(req *spyv1.SubscribeSignedVAARequest, resp spyv1.SpyRPCService_SubscribeSignedVAAServer) error {
var fi []filter
if req.Filters != nil {
for _, f := range req.Filters {
switch t := f.Filter.(type) {
case *spyv1.FilterEntry_EmitterFilter:
addr, err := decodeEmitterAddr(t.EmitterFilter.EmitterAddress)
if err != nil {
return status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode emitter address: %v", err))
}
fi = append(fi, filter{
chainId: vaa.ChainID(t.EmitterFilter.ChainId),
emitterAddr: addr,
})
default:
return status.Error(codes.InvalidArgument, "unsupported filter type")
}
}
}
s.subsMu.Lock()
id := subscriptionId()
sub := &subscription{
ch: make(chan message, 1),
filters: fi,
}
s.subs[id] = sub
s.subsMu.Unlock()
defer func() {
s.subsMu.Lock()
defer s.subsMu.Unlock()
delete(s.subs, id)
}()
for {
select {
case <-resp.Context().Done():
return resp.Context().Err()
case msg := <-sub.ch:
if err := resp.Send(&spyv1.SubscribeSignedVAAResponse{
VaaBytes: msg.vaaBytes,
}); err != nil {
return err
}
}
}
}
func newSpyServer(logger *zap.Logger) *spyServer {
return &spyServer{
logger: logger.Named("spyserver"),
subs: make(map[string]*subscription),
}
}
func spyServerRunnable(s *spyServer, logger *zap.Logger, listenAddr string) (supervisor.Runnable, *grpc.Server, error) {
l, err := net.Listen("tcp", listenAddr)
if err != nil {
return nil, nil, fmt.Errorf("failed to listen: %w", err)
}
logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))
grpcServer := common.NewInstrumentedGRPCServer(logger)
spyv1.RegisterSpyRPCServiceServer(grpcServer, s)
return supervisor.GRPCServer(grpcServer, l, false), grpcServer, nil
}
func runSpy(cmd *cobra.Command, args []string) {
common.SetRestrictiveUmask()
lvl, err := ipfslog.LevelFromString(*logLevel)
if err != nil {
fmt.Println("Invalid log level")
os.Exit(1)
}
logger := ipfslog.Logger("wormhole-spy").Desugar()
ipfslog.SetAllLoggers(lvl)
// Status server
if *statusAddr != "" {
router := mux.NewRouter()
router.Handle("/metrics", promhttp.Handler())
go func() {
logger.Info("status server listening on [::]:6060")
logger.Error("status server crashed", zap.Error(http.ListenAndServe(*statusAddr, router)))
}()
}
// Verify flags
if *nodeKeyPath == "" {
logger.Fatal("Please specify --nodeKey")
}
if *p2pBootstrap == "" {
logger.Fatal("Please specify --bootstrap")
}
// Node's main lifecycle context.
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()
// Outbound gossip message queue
sendC := make(chan []byte)
// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
// Guardian set state managed by processor
gst := common.NewGuardianSetState()
// RPC server
s := newSpyServer(logger)
rpcSvc, _, err := spyServerRunnable(s, logger, *spyRPC)
if err != nil {
logger.Fatal("failed to start RPC server", zap.Error(err))
}
// Ignore observations
go func() {
for {
select {
case <-rootCtx.Done():
return
case <-obsvC:
}
}
}()
// Log signed VAAs
go func() {
for {
select {
case <-rootCtx.Done():
return
case v := <-signedInC:
logger.Info("Received signed VAA",
zap.Any("vaa", v.Vaa))
if err := s.Publish(v.Vaa); err != nil {
logger.Error("failed to publish signed VAA", zap.Error(err))
}
}
}
}()
// Load p2p private key
var priv crypto.PrivKey
priv, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath)
if err != nil {
logger.Fatal("Failed to load node key", zap.Error(err))
}
// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
Chain governor (#1277) * Rebase * Reload from db on start up Change-Id: I1deac9db28ad1157ea7e0c84af41c35b38497f4e * Console commands Change-Id: Ic242038312b7c837443a2df8823f100b3cdffd77 * Query prices from CoinGecko Change-Id: I9a8c282ba374d32ef7045d11979a27ede3c52827 * Move chain config to separate file Change-Id: I6a790eca765bce1f2caf48134e225df5c4daff15 * More code cleanup Change-Id: Id12affa78cdc2d394d6dab0c53bb7ad06df8007e * Few minor tweaks Change-Id: I6cb511599d669e0b3d716d9f314ac0f26935ee92 * Create separate tests for different packages Change-Id: Idb4da6817c9daad2a7420abc11bdaa702ae056dc * Fix lint errors Change-Id: I137c9e7e4574aee9c9fec22e91e19eee0e86a349 * Simplify chainlock message tests * Add more governor db test coverage * Next batch of review rework Change-Id: Ife54852fca6c6990d1ffb3d60a8dd7f49d526f0a * Still more rework Change-Id: I43a8aa7fa4e1a7cea4d7fde68c963123c1ca8d53 * More rework Change-Id: I9382412af4ffeda74967a834a6b0195a9d28b720 * Fix lint errors Change-Id: Idaafce9b0314192557b7207911375d000bac5ae2 * Add rest and prometheus support Change-Id: Ib870ed7eb305ef1ebbf6a7cedabc665c37c19171 * Add separate configs for testnet and devnet Change-Id: I76b11d8940a8dc9935b3f276a008ed20ef60b850 * Update mainnet tokens to fix decimals Change-Id: Iab018827766bc7748038b7be2f51342afb79b87c * Let small enqueued VAAs go out when big ones can't Change-Id: I7d3ef88d4579702d0c6ff4eaf5a8524799610ff6 * Tweak testnet config parameters Change-Id: Id2c54151a7183ab3fb4af8060929198f6021ba4e * Rework / enhancements from testnet testing Change-Id: I1387b1d22667fa6ffe0bb1832dbc0b31196505d3 * Use known emitter maps Change-Id: If330ee9d30ac3c2d1c6dca674f7777dc759de230 * Fix typo and out of date comments Change-Id: I54a19792104ccc6ca023020303a710ef3ba18f74 Co-authored-by: claudijd <jclaudius@jumptrading.com>
2022-07-19 11:08:06 -07:00
if err := supervisor.Run(ctx, "p2p", p2p.Run(obsvC, nil, nil, sendC, signedInC, priv, nil, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, "", false, rootCtxCancel, nil)); err != nil {
return err
}
if err := supervisor.Run(ctx, "spyrpc", rpcSvc); err != nil {
return err
}
logger.Info("Started internal services")
<-ctx.Done()
return nil
},
// It's safer to crash and restart the process in case we encounter a panic,
// rather than attempting to reschedule the runnable.
supervisor.WithPropagatePanic)
<-rootCtx.Done()
logger.Info("root context cancelled, exiting...")
// TODO: wait for things to shut down gracefully
}