node: remove raw heartbeat stream

As discussed with Justin, we no longer need this endpoint.

Removing it means we no longer have to worry about
long-lived connections.

Change-Id: I75020652d383a6b5f79a3cad1b52ae87d323f012
This commit is contained in:
Leo 2021-08-21 22:47:15 +02:00 committed by Leopold Schabel
parent dc94553751
commit 3ec4ad6ad3
9 changed files with 14 additions and 186 deletions

View File

@ -31,12 +31,10 @@ func init() {
}
AdminClientInjectGuardianSetUpdateCmd.Flags().AddFlagSet(pf)
AdminClientListNodesStream.Flags().AddFlagSet(pf)
AdminClientListNodes.Flags().AddFlagSet(pf)
AdminCmd.AddCommand(AdminClientInjectGuardianSetUpdateCmd)
AdminCmd.AddCommand(AdminClientGovernanceVAAVerifyCmd)
AdminCmd.AddCommand(AdminClientListNodesStream)
AdminCmd.AddCommand(AdminClientListNodes)
}

View File

@ -6,7 +6,6 @@ import (
publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
"github.com/certusone/wormhole/bridge/pkg/vaa"
"github.com/spf13/cobra"
"io"
"log"
"os"
"sort"
@ -17,48 +16,6 @@ import (
// How to test in container:
// kubectl exec guardian-0 -- /guardiand admin list-nodes --socket /tmp/admin.sock
var AdminClientListNodesStream = &cobra.Command{
Use: "list-nodes-stream",
Short: "Listens to heartbeats and displays an aggregated real-time list of guardian nodes",
Run: runListNodesStream,
}
func runListNodesStream(cmd *cobra.Command, args []string) {
ctx := context.Background()
conn, err, c := getPublicrpcClient(ctx, *clientSocketPath)
defer conn.Close()
if err != nil {
log.Fatalf("failed to get publicrpc client: %v", err)
}
stream, err := c.GetRawHeartbeats(ctx, &publicrpcv1.GetRawHeartbeatsRequest{})
if err != nil {
log.Fatalf("failed to stream heartbeats: %v", err)
}
log.Print("connected, streaming updates")
seen := make(map[string]bool)
w := tabwriter.NewWriter(os.Stdout, 20, 8, 1, '\t', 0)
for {
hb, err := stream.Recv()
if err == io.EOF {
log.Print("server closed connection, exiting")
return
} else if err != nil {
log.Fatalf("error streaming updates: %v", err)
}
if seen[hb.GuardianAddr] {
continue
}
fmt.Fprintf(w, "%s\t%s\t%s\t\n", hb.GuardianAddr, hb.NodeName, hb.Version)
w.Flush()
seen[hb.GuardianAddr] = true
}
}
var (
showDetails bool
)

View File

@ -136,7 +136,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
}
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
// Delete existing UNIX socket, if present.
fi, err := os.Stat(socketPath)
if err == nil {
@ -170,7 +170,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
logger: logger.Named("adminservice"),
}
publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db, gst)
publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)
grpcServer := newGRPCServer(logger)
nodev1.RegisterNodePrivilegedServer(grpcServer, nodeService)

View File

@ -31,7 +31,6 @@ import (
"github.com/certusone/wormhole/bridge/pkg/p2p"
"github.com/certusone/wormhole/bridge/pkg/processor"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/bridge/pkg/publicrpc"
"github.com/certusone/wormhole/bridge/pkg/readiness"
solana "github.com/certusone/wormhole/bridge/pkg/solana"
"github.com/certusone/wormhole/bridge/pkg/supervisor"
@ -432,15 +431,13 @@ func runBridge(cmd *cobra.Command, args []string) {
}
}
// subscriber channel multiplexing for public gPRC streams
rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger)
publicrpcService, publicrpcServer, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db, gst)
publicrpcService, publicrpcServer, err := publicrpcServiceRunnable(logger, *publicRPC, db, gst)
if err != nil {
log.Fatal("failed to create publicrpc service socket", zap.Error(err))
}
// local admin service socket
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db, gst)
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, db, gst)
if err != nil {
logger.Fatal("failed to create admin service socket", zap.Error(err))
}
@ -454,7 +451,7 @@ func runBridge(cmd *cobra.Command, args []string) {
// Run supervisor.
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "p2p", p2p.Run(
obsvC, sendC, rawHeartbeatListeners, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil {
obsvC, sendC, priv, gk, gst, *p2pPort, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, rootCtxCancel)); err != nil {
return err
}

View File

@ -12,7 +12,7 @@ import (
"net"
)
func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, *grpc.Server, error) {
func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, *grpc.Server, error) {
l, err := net.Listen("tcp", listenAddr)
if err != nil {
return nil, nil, fmt.Errorf("failed to listen: %w", err)
@ -20,7 +20,7 @@ func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicr
logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))
rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db, gst)
rpcServer := publicrpc.NewPublicrpcServer(logger, db, gst)
grpcServer := newGRPCServer(logger)
publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer)

View File

@ -32,7 +32,6 @@ import (
"google.golang.org/protobuf/proto"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/bridge/pkg/publicrpc"
"github.com/certusone/wormhole/bridge/pkg/supervisor"
)
@ -60,7 +59,7 @@ func heartbeatDigest(b []byte) common.Hash {
return ethcrypto.Keccak256Hash(append(heartbeatMessagePrefix, b...))
}
func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeatListeners *publicrpc.RawHeartbeatConns, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *bridge_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, priv crypto.PrivKey, gk *ecdsa.PrivateKey, gst *bridge_common.GuardianSetState, port uint, networkID string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, rootCtxCancel context.CancelFunc) func(ctx context.Context) error {
return func(ctx context.Context) (re error) {
logger := supervisor.Logger(ctx)
@ -208,7 +207,6 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
}
ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey)
rawHeartbeatListeners.PublishHeartbeat(heartbeat)
if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil {
panic(err)
}
@ -318,7 +316,6 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
logger.Debug("valid signed heartbeat received",
zap.Any("value", heartbeat),
zap.String("from", envelope.GetFrom().String()))
rawHeartbeatListeners.PublishHeartbeat(heartbeat)
}
case *gossipv1.GossipMessage_SignedObservation:
obsvC <- m.SignedObservation

View File

@ -6,7 +6,6 @@ import (
"fmt"
"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/db"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
"github.com/certusone/wormhole/bridge/pkg/vaa"
"go.uber.org/zap"
@ -17,23 +16,20 @@ import (
// PublicrpcServer implements the publicrpc gRPC service.
type PublicrpcServer struct {
publicrpcv1.UnimplementedPublicrpcServer
rawHeartbeatListeners *RawHeartbeatConns
logger *zap.Logger
db *db.Database
gst *common.GuardianSetState
logger *zap.Logger
db *db.Database
gst *common.GuardianSetState
}
func NewPublicrpcServer(
logger *zap.Logger,
rawHeartbeatListeners *RawHeartbeatConns,
db *db.Database,
gst *common.GuardianSetState,
) *PublicrpcServer {
return &PublicrpcServer{
rawHeartbeatListeners: rawHeartbeatListeners,
logger: logger.Named("publicrpcserver"),
db: db,
gst: gst,
logger: logger.Named("publicrpcserver"),
db: db,
gst: gst,
}
}
@ -62,27 +58,6 @@ func (s *PublicrpcServer) GetLastHeartbeats(ctx context.Context, req *publicrpcv
return resp, nil
}
func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequest, stream publicrpcv1.Publicrpc_GetRawHeartbeatsServer) error {
s.logger.Info("gRPC heartbeat stream opened by client")
// create a channel and register it for heartbeats
receiveChan := make(chan *gossipv1.Heartbeat, 50)
// clientId is the reference to the subscription that we will use for unsubscribing when the client disconnects.
clientId := s.rawHeartbeatListeners.subscribeHeartbeats(receiveChan)
for {
select {
// Exit on stream context done
case <-stream.Context().Done():
s.logger.Info("raw heartbeat stream closed by client", zap.Int("clientId", clientId))
s.rawHeartbeatListeners.unsubscribeHeartbeats(clientId)
return stream.Context().Err()
case msg := <-receiveChan:
stream.Send(msg)
}
}
}
func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.GetSignedVAARequest) (*publicrpcv1.GetSignedVAAResponse, error) {
address, err := hex.DecodeString(req.MessageId.EmitterAddress)
if err != nil {

View File

@ -1,84 +0,0 @@
package publicrpc
import (
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/prometheus/client_golang/prometheus/promauto"
"math/rand"
"sync"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/zap"
)
// track the number of active connections
var (
currentPublicHeartbeatStreamsOpen = promauto.NewGauge(
prometheus.GaugeOpts{
Name: "wormhole_publicrpc_rawheartbeat_connections",
Help: "Current number of clients consuming gRPC raw heartbeat streams",
})
)
// RawHeartbeatConns holds the multiplexing state required for distribution of
// heartbeat messages to all the open connections.
type RawHeartbeatConns struct {
mu sync.RWMutex
subs map[int]chan<- *gossipv1.Heartbeat
logger *zap.Logger
}
func HeartbeatStreamMultiplexer(logger *zap.Logger) *RawHeartbeatConns {
ps := &RawHeartbeatConns{
subs: map[int]chan<- *gossipv1.Heartbeat{},
logger: logger.Named("heartbeatmultiplexer"),
}
return ps
}
// getUniqueClientId loops to generate & test integers for existence as key of map. returns an int that is not a key in map.
func (ps *RawHeartbeatConns) getUniqueClientId() int {
clientId := rand.Intn(1e6)
found := false
for found {
clientId = rand.Intn(1e6)
_, found = ps.subs[clientId]
}
return clientId
}
// subscribeHeartbeats adds a channel to the subscriber map, keyed by arbitrary clientId
func (ps *RawHeartbeatConns) subscribeHeartbeats(ch chan *gossipv1.Heartbeat) int {
ps.mu.Lock()
defer ps.mu.Unlock()
clientId := ps.getUniqueClientId()
ps.logger.Info("subscribeHeartbeats for client", zap.Int("client", clientId))
ps.subs[clientId] = ch
currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs)))
return clientId
}
// PublishHeartbeat sends a message to all channels in the subscription map
func (ps *RawHeartbeatConns) PublishHeartbeat(msg *gossipv1.Heartbeat) {
ps.mu.RLock()
defer ps.mu.RUnlock()
for client, ch := range ps.subs {
select {
case ch <- msg:
ps.logger.Debug("published message to client", zap.Int("client", client))
default:
ps.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
}
}
}
// unsubscribeHeartbeats removes the client's channel from the subscription map
func (ps *RawHeartbeatConns) unsubscribeHeartbeats(clientId int) {
ps.mu.Lock()
defer ps.mu.Unlock()
ps.logger.Debug("unsubscribeHeartbeats for client", zap.Int("clientId", clientId))
delete(ps.subs, clientId)
currentPublicHeartbeatStreamsOpen.Set(float64(len(ps.subs)))
}

View File

@ -27,15 +27,6 @@ message MessageID {
// Publicrpc service exposes endpoints to be consumed externally; GUIs, historical record keeping, etc.
service Publicrpc {
// GetRawHeartbeats rpc endpoint returns a stream of the p2p heartbeat messages received.
// The GetRawHeartbeats stream will include all messages received by the guardian,
// without any filtering or verification of message content.
rpc GetRawHeartbeats (GetRawHeartbeatsRequest) returns (stream gossip.v1.Heartbeat) {
option (google.api.http) = {
get: "/v1/heartbeats:stream_raw"
};
};
// GetLastHeartbeats returns the last heartbeat received for each guardian node in the
// node's active guardian set. Heartbeats received by nodes not in the guardian set are ignored.
// The heartbeat value is null if no heartbeat has yet been received.
@ -59,9 +50,6 @@ service Publicrpc {
}
message GetRawHeartbeatsRequest {
}
message GetSignedVAARequest {
MessageID message_id = 1;
}