From 54b2e51379ada6212df0f897c877bd0eb869a1b9 Mon Sep 17 00:00:00 2001 From: Leo Date: Thu, 22 Jul 2021 13:26:19 +0200 Subject: [PATCH] publicrpc: move runnable to cmd/guardiand This matches the adminrpc implementation, removing the runnable and socket listener from pkg/publicrpc API surface. Change-Id: Ia6461c2ff839f39462391c5afd2694b1619b30b6 --- bridge/cmd/guardiand/bridge.go | 9 +++++++-- bridge/cmd/guardiand/publicrpc.go | 26 +++++++++++++++++++++++++ bridge/pkg/publicrpc/publicrpcserver.go | 22 +-------------------- 3 files changed, 34 insertions(+), 23 deletions(-) create mode 100644 bridge/cmd/guardiand/publicrpc.go diff --git a/bridge/cmd/guardiand/bridge.go b/bridge/cmd/guardiand/bridge.go index eac64c710..a253284f6 100644 --- a/bridge/cmd/guardiand/bridge.go +++ b/bridge/cmd/guardiand/bridge.go @@ -3,6 +3,7 @@ package guardiand import ( "context" "fmt" + "log" "net/http" _ "net/http/pprof" "os" @@ -352,6 +353,7 @@ func runBridge(cmd *cobra.Command, args []string) { } } + // local admin service socket adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC) if err != nil { logger.Fatal("failed to create admin service socket", zap.Error(err)) @@ -359,6 +361,10 @@ func runBridge(cmd *cobra.Command, args []string) { // subscriber channel multiplexing for public gPRC streams rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger) + publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners) + if err != nil { + log.Fatal("failed to create publicrpc service socket", zap.Error(err)) + } // Run supervisor. supervisor.New(rootCtx, logger, func(ctx context.Context) error { @@ -407,8 +413,7 @@ func runBridge(cmd *cobra.Command, args []string) { return err } if *publicRPC != "" { - if err := supervisor.Run(ctx, "publicrpc", - publicrpc.PublicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners)); err != nil { + if err := supervisor.Run(ctx, "publicrpc", publicrpcService); err != nil { return err } } diff --git a/bridge/cmd/guardiand/publicrpc.go b/bridge/cmd/guardiand/publicrpc.go new file mode 100644 index 000000000..93ef09e5d --- /dev/null +++ b/bridge/cmd/guardiand/publicrpc.go @@ -0,0 +1,26 @@ +package guardiand + +import ( + "fmt" + publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" + "github.com/certusone/wormhole/bridge/pkg/publicrpc" + "github.com/certusone/wormhole/bridge/pkg/supervisor" + "go.uber.org/zap" + "google.golang.org/grpc" + "net" +) + +func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns) (supervisor.Runnable, error) { + l, err := net.Listen("tcp", listenAddr) + if err != nil { + return nil, fmt.Errorf("failed to listen: %w", err) + } + + logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String())) + + rpcServer := publicrpc.NewPublicrpcServer(logger, hl) + grpcServer := grpc.NewServer() + publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer) + + return supervisor.GRPCServer(grpcServer, l, false), nil +} diff --git a/bridge/pkg/publicrpc/publicrpcserver.go b/bridge/pkg/publicrpc/publicrpcserver.go index cd85ea5ef..b91ecd3ce 100644 --- a/bridge/pkg/publicrpc/publicrpcserver.go +++ b/bridge/pkg/publicrpc/publicrpcserver.go @@ -1,14 +1,8 @@ package publicrpc import ( - "fmt" - "net" - - "go.uber.org/zap" - "google.golang.org/grpc" - publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" - "github.com/certusone/wormhole/bridge/pkg/supervisor" + "go.uber.org/zap" ) // PublicrpcServer implements the publicrpc gRPC service. @@ -45,17 +39,3 @@ func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequ } } } - -func PublicrpcServiceRunnable(logger *zap.Logger, listenAddr string, rawHeartbeatListeners *RawHeartbeatConns) supervisor.Runnable { - l, err := net.Listen("tcp", listenAddr) - if err != nil { - logger.Fatal("failed to listen for publicrpc service", zap.Error(err)) - } - logger.Info(fmt.Sprintf("publicrpc server listening on %s", listenAddr)) - - rpcServer := NewPublicrpcServer(logger, rawHeartbeatListeners) - - grpcServer := grpc.NewServer() - publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer) - return supervisor.GRPCServer(grpcServer, l, false) -}