diff --git a/bridge/cmd/guardiand/adminclient.go b/bridge/cmd/guardiand/adminclient.go index a6b629818..663a673be 100644 --- a/bridge/cmd/guardiand/adminclient.go +++ b/bridge/cmd/guardiand/adminclient.go @@ -31,12 +31,10 @@ func init() { } AdminClientInjectGuardianSetUpdateCmd.Flags().AddFlagSet(pf) - AdminClientListNodesStream.Flags().AddFlagSet(pf) AdminClientListNodes.Flags().AddFlagSet(pf) AdminCmd.AddCommand(AdminClientInjectGuardianSetUpdateCmd) AdminCmd.AddCommand(AdminClientGovernanceVAAVerifyCmd) - AdminCmd.AddCommand(AdminClientListNodesStream) AdminCmd.AddCommand(AdminClientListNodes) } diff --git a/bridge/cmd/guardiand/adminnodes.go b/bridge/cmd/guardiand/adminnodes.go index 78861bfdd..61c9752e9 100644 --- a/bridge/cmd/guardiand/adminnodes.go +++ b/bridge/cmd/guardiand/adminnodes.go @@ -6,7 +6,6 @@ import ( publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/bridge/pkg/vaa" "github.com/spf13/cobra" - "io" "log" "os" "sort" @@ -17,48 +16,6 @@ import ( // How to test in container: // kubectl exec guardian-0 -- /guardiand admin list-nodes --socket /tmp/admin.sock -var AdminClientListNodesStream = &cobra.Command{ - Use: "list-nodes-stream", - Short: "Listens to heartbeats and displays an aggregated real-time list of guardian nodes", - Run: runListNodesStream, -} - -func runListNodesStream(cmd *cobra.Command, args []string) { - ctx := context.Background() - conn, err, c := getPublicrpcClient(ctx, *clientSocketPath) - defer conn.Close() - if err != nil { - log.Fatalf("failed to get publicrpc client: %v", err) - } - - stream, err := c.GetRawHeartbeats(ctx, &publicrpcv1.GetRawHeartbeatsRequest{}) - if err != nil { - log.Fatalf("failed to stream heartbeats: %v", err) - } - - log.Print("connected, streaming updates") - - seen := make(map[string]bool) - w := tabwriter.NewWriter(os.Stdout, 20, 8, 1, '\t', 0) - for { - hb, err := stream.Recv() - if err == io.EOF { - log.Print("server closed connection, exiting") - return - } else if err != nil { - log.Fatalf("error streaming updates: %v", err) - } - - if seen[hb.GuardianAddr] { - continue - } - - fmt.Fprintf(w, "%s\t%s\t%s\t\n", hb.GuardianAddr, hb.NodeName, hb.Version) - w.Flush() - seen[hb.GuardianAddr] = true - } -} - var ( showDetails bool ) diff --git a/bridge/cmd/guardiand/adminserver.go b/bridge/cmd/guardiand/adminserver.go index 6a99c4260..a150dca65 100644 --- a/bridge/cmd/guardiand/adminserver.go +++ b/bridge/cmd/guardiand/adminserver.go @@ -136,7 +136,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil } -func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { +func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { // Delete existing UNIX socket, if present. fi, err := os.Stat(socketPath) if err == nil { @@ -170,7 +170,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- logger: logger.Named("adminservice"), } - publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db, gst) + publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst) grpcServer := newGRPCServer(logger) nodev1.RegisterNodePrivilegedServer(grpcServer, nodeService) diff --git a/bridge/cmd/guardiand/bridge.go b/bridge/cmd/guardiand/bridge.go index be945e473..c949d0658 100644 --- a/bridge/cmd/guardiand/bridge.go +++ b/bridge/cmd/guardiand/bridge.go @@ -31,7 +31,6 @@ import ( "github.com/certusone/wormhole/bridge/pkg/p2p" "github.com/certusone/wormhole/bridge/pkg/processor" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" - "github.com/certusone/wormhole/bridge/pkg/publicrpc" "github.com/certusone/wormhole/bridge/pkg/readiness" solana "github.com/certusone/wormhole/bridge/pkg/solana" "github.com/certusone/wormhole/bridge/pkg/supervisor" @@ -432,15 +431,13 @@ func runBridge(cmd *cobra.Command, args []string) { } } - // subscriber channel multiplexing for public gPRC streams - rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger) - publicrpcService, publicrpcServer, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db, gst) + publicrpcService, publicrpcServer, err := publicrpcServiceRunnable(logger, *publicRPC, db, gst) if err != nil { log.Fatal("failed to create publicrpc service socket", zap.Error(err)) } // local admin service socket - adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db, gst) + adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, db, gst) if err != nil { logger.Fatal("failed to create admin service socket", zap.Error(err)) } @@ -454,7 +451,7 @@ func runBridge(cmd *cobra.Command, args []string) { // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { if err := supervisor.Run(ctx, "p2p", p2p.Run( - obsvC, sendC, rawHeartbeatListeners, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil { + obsvC, sendC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil { return err } diff --git a/bridge/cmd/guardiand/publicrpc.go b/bridge/cmd/guardiand/publicrpc.go index cd19417ed..93300489f 100644 --- a/bridge/cmd/guardiand/publicrpc.go +++ b/bridge/cmd/guardiand/publicrpc.go @@ -12,7 +12,7 @@ import ( "net" ) -func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, *grpc.Server, error) { +func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, *grpc.Server, error) { l, err := net.Listen("tcp", listenAddr) if err != nil { return nil, nil, fmt.Errorf("failed to listen: %w", err) @@ -20,7 +20,7 @@ func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicr logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String())) - rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db, gst) + rpcServer := publicrpc.NewPublicrpcServer(logger, db, gst) grpcServer := newGRPCServer(logger) publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer) diff --git a/bridge/pkg/p2p/p2p.go b/bridge/pkg/p2p/p2p.go index 42518bcfb..868e14727 100644 --- a/bridge/pkg/p2p/p2p.go +++ b/bridge/pkg/p2p/p2p.go @@ -32,7 +32,6 @@ import ( "google.golang.org/protobuf/proto" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" - "github.com/certusone/wormhole/bridge/pkg/publicrpc" "github.com/certusone/wormhole/bridge/pkg/supervisor" ) @@ -60,7 +59,7 @@ func heartbeatDigest(b []byte) common.Hash { return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...)) } -func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeatListeners *publicrpc.RawHeartbeatConns, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *bridge_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error { +func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *bridge_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error { return func(ctx context.Context) (re error) { logger := supervisor.Logger(ctx) @@ -208,7 +207,6 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat } ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey) - rawHeartbeatListeners.PublishHeartbeat(heartbeat) if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { panic(err) } @@ -318,7 +316,6 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat logger.Debug("valid signed heartbeat received", zap.Any("value", heartbeat), zap.String("from", envelope.GetFrom().String())) - rawHeartbeatListeners.PublishHeartbeat(heartbeat) } case *gossipv1.GossipMessage_SignedObservation: obsvC <- m.SignedObservation diff --git a/bridge/pkg/publicrpc/publicrpcserver.go b/bridge/pkg/publicrpc/publicrpcserver.go index ae18b0b69..07bf2d9f3 100644 --- a/bridge/pkg/publicrpc/publicrpcserver.go +++ b/bridge/pkg/publicrpc/publicrpcserver.go @@ -6,7 +6,6 @@ import ( "fmt" "github.com/certusone/wormhole/bridge/pkg/common" "github.com/certusone/wormhole/bridge/pkg/db" - gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/bridge/pkg/vaa" "go.uber.org/zap" @@ -17,23 +16,20 @@ import ( // PublicrpcServer implements the publicrpc gRPC service. type PublicrpcServer struct { publicrpcv1.UnimplementedPublicrpcServer - rawHeartbeatListeners *RawHeartbeatConns - logger *zap.Logger - db *db.Database - gst *common.GuardianSetState + logger *zap.Logger + db *db.Database + gst *common.GuardianSetState } func NewPublicrpcServer( logger *zap.Logger, - rawHeartbeatListeners *RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState, ) *PublicrpcServer { return &PublicrpcServer{ - rawHeartbeatListeners: rawHeartbeatListeners, - logger: logger.Named("publicrpcserver"), - db: db, - gst: gst, + logger: logger.Named("publicrpcserver"), + db: db, + gst: gst, } } @@ -62,27 +58,6 @@ func (s *PublicrpcServer) GetLastHeartbeats(ctx context.Context, req *publicrpcv return resp, nil } -func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequest, stream publicrpcv1.Publicrpc_GetRawHeartbeatsServer) error { - s.logger.Info("gRPC heartbeat stream opened by client") - - // create a channel and register it for heartbeats - receiveChan := make(chan *gossipv1.Heartbeat, 50) - // clientId is the reference to the subscription that we will use for unsubscribing when the client disconnects. - clientId := s.rawHeartbeatListeners.subscribeHeartbeats(receiveChan) - - for { - select { - // Exit on stream context done - case <-stream.Context().Done(): - s.logger.Info("raw heartbeat stream closed by client", zap.Int("clientId", clientId)) - s.rawHeartbeatListeners.unsubscribeHeartbeats(clientId) - return stream.Context().Err() - case msg := <-receiveChan: - stream.Send(msg) - } - } -} - func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.GetSignedVAARequest) (*publicrpcv1.GetSignedVAAResponse, error) { address, err := hex.DecodeString(req.MessageId.EmitterAddress) if err != nil { diff --git a/bridge/pkg/publicrpc/rawheartbeats.go b/bridge/pkg/publicrpc/rawheartbeats.go deleted file mode 100644 index 8b8eb4b9a..000000000 --- a/bridge/pkg/publicrpc/rawheartbeats.go +++ /dev/null @@ -1,84 +0,0 @@ -package publicrpc - -import ( - gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" - "github.com/prometheus/client_golang/prometheus/promauto" - "math/rand" - "sync" - - "github.com/prometheus/client_golang/prometheus" - "go.uber.org/zap" -) - -// track the number of active connections -var ( - currentPublicHeartbeatStreamsOpen = promauto.NewGauge( - prometheus.GaugeOpts{ - Name: "wormhole_publicrpc_rawheartbeat_connections", - Help: "Current number of clients consuming gRPC raw heartbeat streams", - }) -) - -// RawHeartbeatConns holds the multiplexing state required for distribution of -// heartbeat messages to all the open connections. -type RawHeartbeatConns struct { - mu sync.RWMutex - subs map[int]chan<- *gossipv1.Heartbeat - logger *zap.Logger -} - -func HeartbeatStreamMultiplexer(logger *zap.Logger) *RawHeartbeatConns { - ps := &RawHeartbeatConns{ - subs: map[int]chan<- *gossipv1.Heartbeat{}, - logger: logger.Named("heartbeatmultiplexer"), - } - return ps -} - -// getUniqueClientId loops to generate & test integers for existence as key of map. returns an int that is not a key in map. -func (ps *RawHeartbeatConns) getUniqueClientId() int { - clientId := rand.Intn(1e6) - found := false - for found { - clientId = rand.Intn(1e6) - _, found = ps.subs[clientId] - } - return clientId -} - -// subscribeHeartbeats adds a channel to the subscriber map, keyed by arbitrary clientId -func (ps *RawHeartbeatConns) subscribeHeartbeats(ch chan *gossipv1.Heartbeat) int { - ps.mu.Lock() - defer ps.mu.Unlock() - - clientId := ps.getUniqueClientId() - ps.logger.Info("subscribeHeartbeats for client", zap.Int("client", clientId)) - ps.subs[clientId] = ch - currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs))) - return clientId -} - -// PublishHeartbeat sends a message to all channels in the subscription map -func (ps *RawHeartbeatConns) PublishHeartbeat(msg *gossipv1.Heartbeat) { - ps.mu.RLock() - defer ps.mu.RUnlock() - - for client, ch := range ps.subs { - select { - case ch <- msg: - ps.logger.Debug("published message to client", zap.Int("client", client)) - default: - ps.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client)) - } - } -} - -// unsubscribeHeartbeats removes the client's channel from the subscription map -func (ps *RawHeartbeatConns) unsubscribeHeartbeats(clientId int) { - ps.mu.Lock() - defer ps.mu.Unlock() - - ps.logger.Debug("unsubscribeHeartbeats for client", zap.Int("clientId", clientId)) - delete(ps.subs, clientId) - currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs))) -} diff --git a/proto/publicrpc/v1/publicrpc.proto b/proto/publicrpc/v1/publicrpc.proto index b030b9a25..947baf40a 100644 --- a/proto/publicrpc/v1/publicrpc.proto +++ b/proto/publicrpc/v1/publicrpc.proto @@ -27,15 +27,6 @@ message MessageID { // Publicrpc service exposes endpoints to be consumed externally; GUIs, historical record keeping, etc. service Publicrpc { - // GetRawHeartbeats rpc endpoint returns a stream of the p2p heartbeat messages received. - // The GetRawHeartbeats stream will include all messages received by the guardian, - // without any filtering or verification of message content. - rpc GetRawHeartbeats (GetRawHeartbeatsRequest) returns (stream gossip.v1.Heartbeat) { - option (google.api.http) = { - get: "/v1/heartbeats:stream_raw" - }; - }; - // GetLastHeartbeats returns the last heartbeat received for each guardian node in the // node's active guardian set. Heartbeats received by nodes not in the guardian set are ignored. // The heartbeat value is null if no heartbeat has yet been received. @@ -59,9 +50,6 @@ service Publicrpc { } -message GetRawHeartbeatsRequest { -} - message GetSignedVAARequest { MessageID message_id = 1; }