node: add GetLastHeartbeats RPC call

This aggregates verified guardian heartbeats server-side so they
can be fetched via unary calls.

Change-Id: I8458b139bb5d75f87ed700b50684a5ff8ca594fa
This commit is contained in:
Leo 2021-08-03 20:03:00 +02:00 committed by Leopold Schabel
parent 952a9d9db9
commit 82731c22c0
7 changed files with 86 additions and 11 deletions

View File

@ -132,7 +132,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil
}
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) {
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
// Delete existing UNIX socket, if present.
fi, err := os.Stat(socketPath)
if err == nil {
@ -166,7 +166,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
logger: logger.Named("adminservice"),
}
publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db)
publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db, gst)
grpcServer := grpc.NewServer()
nodev1.RegisterNodePrivilegedServer(grpcServer, nodeService)

View File

@ -375,7 +375,7 @@ func runBridge(cmd *cobra.Command, args []string) {
injectC := make(chan *vaa.VAA)
// Guardian set state managed by processor
gst := &common.GuardianSetState{}
gst := common.NewGuardianSetState()
// Load p2p private key
var priv crypto.PrivKey
@ -394,13 +394,13 @@ func runBridge(cmd *cobra.Command, args []string) {
// subscriber channel multiplexing for public gPRC streams
rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger)
publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db)
publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db, gst)
if err != nil {
log.Fatal("failed to create publicrpc service socket", zap.Error(err))
}
// local admin service socket
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db)
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db, gst)
if err != nil {
logger.Fatal("failed to create admin service socket", zap.Error(err))
}

View File

@ -2,6 +2,7 @@ package guardiand
import (
"fmt"
"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/db"
publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
"github.com/certusone/wormhole/bridge/pkg/publicrpc"
@ -11,7 +12,7 @@ import (
"net"
)
func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) {
func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
l, err := net.Listen("tcp", listenAddr)
if err != nil {
return nil, fmt.Errorf("failed to listen: %w", err)
@ -19,7 +20,7 @@ func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicr
logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String()))
rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db)
rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db, gst)
grpcServer := grpc.NewServer()
publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer)

View File

@ -1,10 +1,13 @@
package common
import (
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
"github.com/ethereum/go-ethereum/common"
"sync"
)
// MaxGuardianCount specifies the maximum number of guardians supported by on-chain contracts.
//
// Matching constants:
// - MAX_LEN_GUARDIAN_KEYS in Solana contract (limited by transaction size - 19 is the maximum amount possible)
//
@ -29,7 +32,7 @@ func (g *GuardianSet) KeysAsHexStrings() []string {
return r
}
// Get a given address index from the guardian set. Returns (-1, false)
// KeyIndex returns a given address index from the guardian set. Returns (-1, false)
// if the address wasn't found and (addr, true) otherwise.
func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
for n, k := range g.Keys {
@ -44,6 +47,16 @@ func (g *GuardianSet) KeyIndex(addr common.Address) (int, bool) {
type GuardianSetState struct {
mu sync.Mutex
current *GuardianSet
// Last heartbeat message received per guardian. Maintained
// across guardian set updates - these values don't change.
lastHeartbeat map[common.Address]*gossipv1.Heartbeat
}
func NewGuardianSetState() *GuardianSetState {
return &GuardianSetState{
lastHeartbeat: map[common.Address]*gossipv1.Heartbeat{},
}
}
func (st *GuardianSetState) Set(set *GuardianSet) {
@ -59,3 +72,18 @@ func (st *GuardianSetState) Get() *GuardianSet {
return st.current
}
// LastHeartbeat returns the most recent heartbeat message received for
// a given guardian node, or nil if none have been received.
func (st *GuardianSetState) LastHeartbeat(addr common.Address) *gossipv1.Heartbeat {
st.mu.Lock()
defer st.mu.Unlock()
return st.lastHeartbeat[addr]
}
// SetHeartBeat stores a verified heartbeat observed by a given guardian.
func (st *GuardianSetState) SetHeartBeat(addr common.Address, hb *gossipv1.Heartbeat) {
st.mu.Lock()
defer st.mu.Unlock()
st.lastHeartbeat[addr] = hb
}

View File

@ -301,7 +301,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
zap.String("from", envelope.GetFrom().String()))
break
}
if heartbeat, err := processSignedHeartbeat(s, gs); err != nil {
if heartbeat, err := processSignedHeartbeat(s, gs, gst); err != nil {
p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc()
logger.Warn("invalid signed heartbeat received",
zap.Error(err),
@ -330,7 +330,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat
}
}
func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet) (*gossipv1.Heartbeat, error) {
func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState) (*gossipv1.Heartbeat, error) {
envelopeAddr := common.BytesToAddress(s.GuardianAddr)
idx, ok := gs.KeyIndex(envelopeAddr)
if !ok {
@ -357,5 +357,8 @@ func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.Guard
return nil, fmt.Errorf("failed to unmarshal heartbeat: %w", err)
}
// Store verified heartbeat in global guardian set state.
gst.SetHeartBeat(signerAddr, &h)
return &h, nil
}

View File

@ -4,6 +4,7 @@ import (
"context"
"encoding/hex"
"fmt"
"github.com/certusone/wormhole/bridge/pkg/common"
"github.com/certusone/wormhole/bridge/pkg/db"
gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1"
publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1"
@ -19,16 +20,41 @@ type PublicrpcServer struct {
rawHeartbeatListeners *RawHeartbeatConns
logger *zap.Logger
db *db.Database
gst *common.GuardianSetState
}
func NewPublicrpcServer(logger *zap.Logger, rawHeartbeatListeners *RawHeartbeatConns, db *db.Database) *PublicrpcServer {
func NewPublicrpcServer(
logger *zap.Logger,
rawHeartbeatListeners *RawHeartbeatConns,
db *db.Database,
gst *common.GuardianSetState,
) *PublicrpcServer {
return &PublicrpcServer{
rawHeartbeatListeners: rawHeartbeatListeners,
logger: logger.Named("publicrpcserver"),
db: db,
gst: gst,
}
}
func (s *PublicrpcServer) GetLastHeartbeats(ctx context.Context, req *publicrpcv1.GetLastHeartbeatRequest) (*publicrpcv1.GetLastHeartbeatResponse, error) {
gs := s.gst.Get()
if gs == nil {
return nil, status.Error(codes.Unavailable, "guardian set not fetched from chain yet")
}
resp := &publicrpcv1.GetLastHeartbeatResponse{
RawHeartbeats: make(map[string]*gossipv1.Heartbeat),
}
for _, addr := range gs.Keys {
hb := s.gst.LastHeartbeat(addr)
resp.RawHeartbeats[addr.Hex()] = hb
}
return resp, nil
}
func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequest, stream publicrpcv1.Publicrpc_GetRawHeartbeatsServer) error {
s.logger.Info("gRPC heartbeat stream opened by client")

View File

@ -36,6 +36,15 @@ service Publicrpc {
};
};
// GetLastHeartbeats returns the last heartbeat received for each guardian node in the
// node's active guardian set. Heartbeats received by nodes not in the guardian set are ignored.
// The heartbeat value is null if no heartbeat has yet been received.
rpc GetLastHeartbeats (GetLastHeartbeatRequest) returns (GetLastHeartbeatResponse) {
option (google.api.http) = {
get: "/v1/heartbeats"
};
}
rpc GetSignedVAA (GetSignedVAARequest) returns (GetSignedVAAResponse) {
option (google.api.http) = {
get: "/v1/signed_vaa/{message_id.emitter_chain}/{message_id.emitter_address}/{message_id.sequence}"
@ -53,3 +62,11 @@ message GetSignedVAARequest {
message GetSignedVAAResponse {
bytes vaa_bytes = 1;
}
message GetLastHeartbeatRequest {
}
message GetLastHeartbeatResponse {
// Mapping of hex-encoded guardian addresses to raw heartbeat messages.
map<string, gossip.v1.Heartbeat> raw_heartbeats = 1;
}