From 16157d339d1015d68cfa723535da5e801ac9ee61 Mon Sep 17 00:00:00 2001 From: jschuldt Date: Thu, 13 May 2021 00:57:08 -0500 Subject: [PATCH] Add publicrpc endpoint for external clients. - Distribute raw heartbeats via new proto package publicrpc - Manage channel subscription on client req/close. - Expose publicprc endpoint in devnet Service. Change-Id: Ic96d624733961aa56e00b03c3b5cff6af11523a4 --- bridge/cmd/guardiand/bridge.go | 16 ++++- bridge/pkg/p2p/p2p.go | 5 ++ bridge/pkg/publicrpc/publicrpcserver.go | 57 ++++++++++++++++ bridge/pkg/publicrpc/rawheartbeats.go | 87 +++++++++++++++++++++++++ devnet/bridge.yaml | 8 +++ proto/gossip/v1/gossip.proto | 3 +- proto/publicrpc/v1/publicrpc.proto | 22 +++++++ 7 files changed, 196 insertions(+), 2 deletions(-) create mode 100644 bridge/pkg/publicrpc/publicrpcserver.go create mode 100644 bridge/pkg/publicrpc/rawheartbeats.go create mode 100644 proto/publicrpc/v1/publicrpc.proto diff --git a/bridge/cmd/guardiand/bridge.go b/bridge/cmd/guardiand/bridge.go index 7fd01af34..748456686 100644 --- a/bridge/cmd/guardiand/bridge.go +++ b/bridge/cmd/guardiand/bridge.go @@ -26,6 +26,7 @@ import ( "github.com/certusone/wormhole/bridge/pkg/p2p" "github.com/certusone/wormhole/bridge/pkg/processor" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/bridge/pkg/publicrpc" "github.com/certusone/wormhole/bridge/pkg/readiness" solana "github.com/certusone/wormhole/bridge/pkg/solana" "github.com/certusone/wormhole/bridge/pkg/supervisor" @@ -71,6 +72,8 @@ var ( unsafeDevMode *bool devNumGuardians *uint nodeName *string + + publicRPC *string ) func init() { @@ -108,6 +111,8 @@ func init() { unsafeDevMode = BridgeCmd.Flags().Bool("unsafeDevMode", false, "Launch node in unsafe, deterministic devnet mode") devNumGuardians = BridgeCmd.Flags().Uint("devNumGuardians", 5, "Number of devnet guardians to include in guardian set") nodeName = BridgeCmd.Flags().String("nodeName", "", "Node name to announce in gossip heartbeats") + + publicRPC = BridgeCmd.Flags().String("publicRPC", "", "Listen address for public gRPC interface") } var ( @@ -380,10 +385,13 @@ func runBridge(cmd *cobra.Command, args []string) { logger.Fatal("failed to create admin service socket", zap.Error(err)) } + // subscriber channel multiplexing for public gPRC streams + rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger) + // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { if err := supervisor.Run(ctx, "p2p", p2p.Run( - obsvC, sendC, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil { + obsvC, sendC, rawHeartbeatListeners, priv, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, rootCtxCancel)); err != nil { return err } @@ -436,6 +444,12 @@ func runBridge(cmd *cobra.Command, args []string) { if err := supervisor.Run(ctx, "admin", adminService); err != nil { return err } + if *publicRPC != "" { + if err := supervisor.Run(ctx, "publicrpc", + publicrpc.PublicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners)); err != nil { + return err + } + } logger.Info("Started internal services") diff --git a/bridge/pkg/p2p/p2p.go b/bridge/pkg/p2p/p2p.go index 2eeedb0c3..87a78ddd7 100644 --- a/bridge/pkg/p2p/p2p.go +++ b/bridge/pkg/p2p/p2p.go @@ -25,6 +25,7 @@ import ( "google.golang.org/protobuf/proto" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" + "github.com/certusone/wormhole/bridge/pkg/publicrpc" "github.com/certusone/wormhole/bridge/pkg/supervisor" ) @@ -54,6 +55,7 @@ func init() { func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, + rawHeartbeatListeners *publicrpc.PublicRawHeartbeatConnections, priv crypto.PrivKey, port uint, networkID string, @@ -203,6 +205,8 @@ func Run(obsvC chan *gossipv1.SignedObservation, GuardianAddr: DefaultRegistry.guardianAddress, }}} + rawHeartbeatListeners.PublishHeartbeat(msg.GetHeartbeat()) + b, err := proto.Marshal(&msg) if err != nil { panic(err) @@ -268,6 +272,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, logger.Debug("heartbeat received", zap.Any("value", m.Heartbeat), zap.String("from", envelope.GetFrom().String())) + rawHeartbeatListeners.PublishHeartbeat(msg.GetHeartbeat()) p2pMessagesReceived.WithLabelValues("heartbeat").Inc() case *gossipv1.GossipMessage_SignedObservation: obsvC <- m.SignedObservation diff --git a/bridge/pkg/publicrpc/publicrpcserver.go b/bridge/pkg/publicrpc/publicrpcserver.go new file mode 100644 index 000000000..4d3aa58af --- /dev/null +++ b/bridge/pkg/publicrpc/publicrpcserver.go @@ -0,0 +1,57 @@ +package publicrpc + +import ( + "fmt" + "net" + + "go.uber.org/zap" + "google.golang.org/grpc" + + publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" + "github.com/certusone/wormhole/bridge/pkg/supervisor" +) + +// gRPC server & method for handling streaming proto connection +type publicrpcServer struct { + publicrpcv1.UnimplementedPublicrpcServer + rawHeartbeatListeners *PublicRawHeartbeatConnections + logger *zap.Logger +} + +func (s *publicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequest, stream publicrpcv1.Publicrpc_GetRawHeartbeatsServer) error { + s.logger.Info("gRPC heartbeat stream opened by client") + + // create a channel and register it for heartbeats + receiveChan := make(chan *publicrpcv1.Heartbeat, 50) + // clientId is the reference to the subscription that we will use for unsubscribing when the client disconnects. + clientId := s.rawHeartbeatListeners.subscribeHeartbeats(receiveChan) + + for { + select { + // Exit on stream context done + case <-stream.Context().Done(): + s.logger.Info("raw heartbeat stream closed by client", zap.Int("clientId", clientId)) + s.rawHeartbeatListeners.unsubscribeHeartbeats(clientId) + return stream.Context().Err() + case msg := <-receiveChan: + stream.Send(msg) + } + } +} + +func PublicrpcServiceRunnable(logger *zap.Logger, listenAddr string, rawHeartbeatListeners *PublicRawHeartbeatConnections) supervisor.Runnable { + l, err := net.Listen("tcp", listenAddr) + if err != nil { + logger.Fatal("failed to listen for publicrpc service", zap.Error(err)) + } + logger.Info(fmt.Sprintf("publicrpc server listening on %s", listenAddr)) + + rpcServer := &publicrpcServer{ + rawHeartbeatListeners: rawHeartbeatListeners, + logger: logger.Named("publicrpcserver"), + } + + grpcServer := grpc.NewServer() + publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer) + return supervisor.GRPCServer(grpcServer, l, false) +} diff --git a/bridge/pkg/publicrpc/rawheartbeats.go b/bridge/pkg/publicrpc/rawheartbeats.go new file mode 100644 index 000000000..159ccf1f2 --- /dev/null +++ b/bridge/pkg/publicrpc/rawheartbeats.go @@ -0,0 +1,87 @@ +package publicrpc + +import ( + "math/rand" + "sync" + + "github.com/prometheus/client_golang/prometheus" + "go.uber.org/zap" + + publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" +) + +// track the number of active connections +var ( + currentPublicHeartbeatStreamsOpen = prometheus.NewGauge( + prometheus.GaugeOpts{ + Name: "wormhole_publicrpc_rawheartbeat_connections", + Help: "Current number of clients consuming gRPC raw heartbeat streams", + }) +) + +func init() { + prometheus.MustRegister(currentPublicHeartbeatStreamsOpen) +} + +// multiplexing to distribute heartbeat messages to all the open connections +type PublicRawHeartbeatConnections struct { + mu sync.RWMutex + subs map[int]chan<- *publicrpcv1.Heartbeat + logger *zap.Logger +} + +func HeartbeatStreamMultiplexer(logger *zap.Logger) *PublicRawHeartbeatConnections { + ps := &PublicRawHeartbeatConnections{ + subs: map[int]chan<- *publicrpcv1.Heartbeat{}, + logger: logger.Named("heartbeatmultiplexer"), + } + return ps +} + +// getUniqueClientId loops to generate & test integers for existence as key of map. returns an int that is not a key in map. +func (ps *PublicRawHeartbeatConnections) getUniqueClientId() int { + clientId := rand.Intn(1e6) + found := false + for found { + clientId = rand.Intn(1e6) + _, found = ps.subs[clientId] + } + return clientId +} + +// subscribeHeartbeats adds a channel to the subscriber map, keyed by arbitary clientId +func (ps *PublicRawHeartbeatConnections) subscribeHeartbeats(ch chan *publicrpcv1.Heartbeat) int { + ps.mu.Lock() + defer ps.mu.Unlock() + + clientId := ps.getUniqueClientId() + ps.logger.Info("subscribeHeartbeats for client", zap.Int("client", clientId)) + ps.subs[clientId] = ch + currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs))) + return clientId +} + +// PublishHeartbeat sends a message to all channels in the subscription map +func (ps *PublicRawHeartbeatConnections) PublishHeartbeat(msg *publicrpcv1.Heartbeat) { + ps.mu.RLock() + defer ps.mu.RUnlock() + + for client, ch := range ps.subs { + select { + case ch <- msg: + ps.logger.Debug("published message to client", zap.Int("client", client)) + default: + ps.logger.Debug("buffer overrrun when attempting to publish message", zap.Int("client", client)) + } + } +} + +// unsubscribeHeartbeats removes the client's channel from the subscription map +func (ps *PublicRawHeartbeatConnections) unsubscribeHeartbeats(clientId int) { + ps.mu.Lock() + defer ps.mu.Unlock() + + ps.logger.Debug("unsubscribeHeartbeats for client", zap.Int("clientId", clientId)) + delete(ps.subs, clientId) + currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs))) +} diff --git a/devnet/bridge.yaml b/devnet/bridge.yaml index 3f25f236d..5483b5b39 100644 --- a/devnet/bridge.yaml +++ b/devnet/bridge.yaml @@ -10,6 +10,9 @@ spec: - port: 8999 name: p2p protocol: UDP + - port: 7070 + name: public-grpc + protocol: TCP clusterIP: None selector: app: guardian @@ -84,6 +87,8 @@ spec: - --unsafeDevMode - --bridgeKey - /tmp/bridge.key + - --publicRPC + - "[::]:7070" - --adminSocket - /tmp/admin.sock # - --logLevel=debug @@ -103,6 +108,9 @@ spec: - containerPort: 6060 name: pprof protocol: TCP + - containerPort: 7070 + name: public-grpc + protocol: TCP - name: agent image: solana-agent volumeMounts: diff --git a/proto/gossip/v1/gossip.proto b/proto/gossip/v1/gossip.proto index 5f455740a..b7d2d0377 100644 --- a/proto/gossip/v1/gossip.proto +++ b/proto/gossip/v1/gossip.proto @@ -2,7 +2,8 @@ syntax = "proto3"; package gossip.v1; -option go_package = "proto/gossip/v1;gossipv1"; +// full path of the resulting Go file is required in order to import in whisper.proto +option go_package = "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1;gossipv1"; message GossipMessage { oneof message { diff --git a/proto/publicrpc/v1/publicrpc.proto b/proto/publicrpc/v1/publicrpc.proto new file mode 100644 index 000000000..7d688a9ac --- /dev/null +++ b/proto/publicrpc/v1/publicrpc.proto @@ -0,0 +1,22 @@ +syntax = "proto3"; + +// only relevant for protobuf namespace +package publicrpc.v1; + +// only relevant for Go namespace +option go_package = "proto/publicrpc/v1;publicrpcv1"; + +// public import will include the required types in the Go output +import public "gossip/v1/gossip.proto"; + +// Publicrpc service exposes endpoints to be consumed externally; GUIs, historical record keeping, etc. +service Publicrpc { + // GetRawHeartbeats rpc endpoint returns a stream of the p2p heartbeat messages received. + // The GetRawHeartbeats stream will include all messages received by the guardian, + // without any filtering or verification of message content. + rpc GetRawHeartbeats (GetRawHeartbeatsRequest) returns (stream gossip.v1.Heartbeat); +} + +// GetRawHeartbeatsRequest is an empty request, sent as part of a request to start a stream. +message GetRawHeartbeatsRequest { +}