From c7662d611e33b4c8095a97510b10003ae041987d Mon Sep 17 00:00:00 2001 From: Leo Date: Sun, 8 Aug 2021 11:38:21 +0200 Subject: [PATCH] node: store heartbeats for multiple nodes per guardian It's safe to break the proto API at this point. Change-Id: I235100c5fef3abc9259d28f68d9bb7bf2be0ae5e --- bridge/cmd/guardiand/adminnodes.go | 35 +++++++++------- bridge/pkg/common/guardianset.go | 55 +++++++++++++++++++------ bridge/pkg/p2p/p2p.go | 12 ++++-- bridge/pkg/publicrpc/publicrpcserver.go | 19 ++++----- proto/publicrpc/v1/publicrpc.proto | 20 +++++++-- 5 files changed, 95 insertions(+), 46 deletions(-) diff --git a/bridge/cmd/guardiand/adminnodes.go b/bridge/cmd/guardiand/adminnodes.go index 6f08c1402..3cd155d98 100644 --- a/bridge/cmd/guardiand/adminnodes.go +++ b/bridge/cmd/guardiand/adminnodes.go @@ -3,7 +3,6 @@ package guardiand import ( "context" "fmt" - gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/bridge/pkg/vaa" "github.com/spf13/cobra" @@ -79,37 +78,41 @@ func runListNodes(cmd *cobra.Command, args []string) { log.Fatalf("failed to list nodes: %v", err) } - nodes := make([]*gossipv1.Heartbeat, len(lastHeartbeats.RawHeartbeats)) - i := 0 - for _, v := range lastHeartbeats.RawHeartbeats { - nodes[i] = v - i += 1 - } + nodes := lastHeartbeats.Entries + sort.Slice(nodes, func(i, j int) bool { - return nodes[i].NodeName < nodes[j].NodeName + if nodes[i].RawHeartbeat == nil || nodes[j].RawHeartbeat == nil { + return false + } + return nodes[i].RawHeartbeat.NodeName < nodes[j].RawHeartbeat.NodeName }) log.Printf("%d nodes in guardian state set", len(nodes)) w := tabwriter.NewWriter(os.Stdout, 0, 8, 2, ' ', 0) - w.Write([]byte("Guardian key\tNode name\tVersion\tLast seen\tUptime\tSolana\tEthereum\tTerra\tBSC\n")) + w.Write([]byte("Node key\tGuardian key\tNode name\tVersion\tLast seen\tUptime\tSolana\tEthereum\tTerra\tBSC\n")) for _, h := range nodes { - last := time.Unix(0, h.Timestamp) + if h.RawHeartbeat == nil { + continue + } + + last := time.Unix(0, h.RawHeartbeat.Timestamp) heights := map[vaa.ChainID]int64{} - for _, n := range h.Networks { + for _, n := range h.RawHeartbeat.Networks { heights[vaa.ChainID(n.Id)] = n.Height } fmt.Fprintf(w, - "%s\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\n", - h.GuardianAddr, - h.NodeName, - h.Version, + "%s\t%s\t%s\t%s\t%s\t%d\t%d\t%d\t%d\t%d\n", + h.P2PNodeAddr, + h.RawHeartbeat.GuardianAddr, + h.RawHeartbeat.NodeName, + h.RawHeartbeat.Version, time.Since(last), - h.Counter, + h.RawHeartbeat.Counter, heights[vaa.ChainIDSolana], heights[vaa.ChainIDEthereum], heights[vaa.ChainIDTerra], diff --git a/bridge/pkg/common/guardianset.go b/bridge/pkg/common/guardianset.go index 76a1fa7db..bcdcb8c19 100644 --- a/bridge/pkg/common/guardianset.go +++ b/bridge/pkg/common/guardianset.go @@ -1,9 +1,10 @@ package common import ( + "fmt" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" "github.com/ethereum/go-ethereum/common" - "google.golang.org/protobuf/proto" + "github.com/libp2p/go-libp2p-core/peer" "sync" ) @@ -16,6 +17,13 @@ import ( // but presumably, chain-specific transaction size limits will apply at some point (untested). const MaxGuardianCount = 19 +// MaxNodesPerGuardian specifies the maximum amount of nodes per guardian key that we'll accept +// whenever we maintain any per-guardian, per-node state. +// +// There currently isn't any state clean up, so the value is on the high side to prevent +// accidentally reaching the limit due to operational mistakes. +const MaxNodesPerGuardian = 15 + type GuardianSet struct { // Guardian's public key hashes truncated by the ETH standard hashing mechanism (20 bytes). Keys []common.Address @@ -49,14 +57,14 @@ type GuardianSetState struct { mu sync.Mutex current *GuardianSet - // Last heartbeat message received per guardian. Maintained + // Last heartbeat message received per guardian per p2p node. Maintained // across guardian set updates - these values don't change. - lastHeartbeat map[common.Address]*gossipv1.Heartbeat + lastHeartbeats map[common.Address]map[peer.ID]*gossipv1.Heartbeat } func NewGuardianSetState() *GuardianSetState { return &GuardianSetState{ - lastHeartbeat: map[common.Address]*gossipv1.Heartbeat{}, + lastHeartbeats: map[common.Address]map[peer.ID]*gossipv1.Heartbeat{}, } } @@ -76,29 +84,50 @@ func (st *GuardianSetState) Get() *GuardianSet { // LastHeartbeat returns the most recent heartbeat message received for // a given guardian node, or nil if none have been received. -func (st *GuardianSetState) LastHeartbeat(addr common.Address) *gossipv1.Heartbeat { +func (st *GuardianSetState) LastHeartbeat(addr common.Address) map[peer.ID]*gossipv1.Heartbeat { st.mu.Lock() defer st.mu.Unlock() - return st.lastHeartbeat[addr] + ret := make(map[peer.ID]*gossipv1.Heartbeat) + for k, v := range st.lastHeartbeats[addr] { + ret[k] = v + } + return ret } -// SetHeartBeat stores a verified heartbeat observed by a given guardian. -func (st *GuardianSetState) SetHeartBeat(addr common.Address, hb *gossipv1.Heartbeat) { +// SetHeartbeat stores a verified heartbeat observed by a given guardian. +func (st *GuardianSetState) SetHeartbeat(addr common.Address, peerId peer.ID, hb *gossipv1.Heartbeat) error { st.mu.Lock() defer st.mu.Unlock() - st.lastHeartbeat[addr] = hb + + v, ok := st.lastHeartbeats[addr] + + if !ok { + v = make(map[peer.ID]*gossipv1.Heartbeat) + st.lastHeartbeats[addr] = v + } else { + if len(v) >= MaxNodesPerGuardian { + // TODO: age out old entries? + return fmt.Errorf("too many nodes (%d) for guardian, cannot store entry", len(v)) + } + } + + v[peerId] = hb + return nil } // GetAll returns all stored heartbeats. -func (st *GuardianSetState) GetAll() map[common.Address]*gossipv1.Heartbeat { +func (st *GuardianSetState) GetAll() map[common.Address]map[peer.ID]*gossipv1.Heartbeat { st.mu.Lock() defer st.mu.Unlock() - ret := make(map[common.Address]*gossipv1.Heartbeat) + ret := make(map[common.Address]map[peer.ID]*gossipv1.Heartbeat) // Deep copy - for k, v := range st.lastHeartbeat { - ret[k] = proto.Clone(v).(*gossipv1.Heartbeat) + for addr, v := range st.lastHeartbeats { + ret[addr] = make(map[peer.ID]*gossipv1.Heartbeat) + for peerId, hb := range v { + ret[addr][peerId] = hb + } } return ret diff --git a/bridge/pkg/p2p/p2p.go b/bridge/pkg/p2p/p2p.go index 7cb3239eb..d04fbba05 100644 --- a/bridge/pkg/p2p/p2p.go +++ b/bridge/pkg/p2p/p2p.go @@ -203,7 +203,9 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat ourAddr := ethcrypto.PubkeyToAddress(gk.PublicKey) rawHeartbeatListeners.PublishHeartbeat(heartbeat) - gst.SetHeartBeat(ourAddr, heartbeat) + if err := gst.SetHeartbeat(ourAddr, h.ID(), heartbeat); err != nil { + panic(err) + } b, err := proto.Marshal(heartbeat) if err != nil { @@ -303,7 +305,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat zap.String("from", envelope.GetFrom().String())) break } - if heartbeat, err := processSignedHeartbeat(s, gs, gst, disableHeartbeatVerify); err != nil { + if heartbeat, err := processSignedHeartbeat(envelope.GetFrom(), s, gs, gst, disableHeartbeatVerify); err != nil { p2pMessagesReceived.WithLabelValues("invalid_heartbeat").Inc() logger.Warn("invalid signed heartbeat received", zap.Error(err), @@ -332,7 +334,7 @@ func Run(obsvC chan *gossipv1.SignedObservation, sendC chan []byte, rawHeartbeat } } -func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) { +func processSignedHeartbeat(from peer.ID, s *gossipv1.SignedHeartbeat, gs *bridge_common.GuardianSet, gst *bridge_common.GuardianSetState, disableVerify bool) (*gossipv1.Heartbeat, error) { envelopeAddr := common.BytesToAddress(s.GuardianAddr) idx, ok := gs.KeyIndex(envelopeAddr) var pk common.Address @@ -363,7 +365,9 @@ func processSignedHeartbeat(s *gossipv1.SignedHeartbeat, gs *bridge_common.Guard } // Store verified heartbeat in global guardian set state. - gst.SetHeartBeat(signerAddr, &h) + if err := gst.SetHeartbeat(signerAddr, from, &h); err != nil { + return nil, fmt.Errorf("failed to store in guardian set state: %w", err) + } return &h, nil } diff --git a/bridge/pkg/publicrpc/publicrpcserver.go b/bridge/pkg/publicrpc/publicrpcserver.go index fcb73c668..2ee676343 100644 --- a/bridge/pkg/publicrpc/publicrpcserver.go +++ b/bridge/pkg/publicrpc/publicrpcserver.go @@ -44,20 +44,19 @@ func (s *PublicrpcServer) GetLastHeartbeats(ctx context.Context, req *publicrpcv } resp := &publicrpcv1.GetLastHeartbeatResponse{ - RawHeartbeats: make(map[string]*gossipv1.Heartbeat), - } - - // Request heartbeat for every guardian set entry. This ensures that - // offline guardians will be listed with a null heartbeat. - for _, addr := range gs.Keys { - hb := s.gst.LastHeartbeat(addr) - resp.RawHeartbeats[addr.Hex()] = hb + Entries: make([]*publicrpcv1.GetLastHeartbeatResponse_Entry, 0), } // Fetch all heartbeats (including from nodes not in the guardian set - which // can happen either with --disableHeartbeatVerify or when the guardian set changes) - for addr, hb := range s.gst.GetAll() { - resp.RawHeartbeats[addr.Hex()] = hb + for addr, v := range s.gst.GetAll() { + for peerId, hb := range v { + resp.Entries = append(resp.Entries, &publicrpcv1.GetLastHeartbeatResponse_Entry{ + VerifiedGuardianAddr: addr.Hex(), + P2PNodeAddr: peerId.Pretty(), + RawHeartbeat: hb, + }) + } } return resp, nil diff --git a/proto/publicrpc/v1/publicrpc.proto b/proto/publicrpc/v1/publicrpc.proto index 1fdeea7a4..e92fbd3a1 100644 --- a/proto/publicrpc/v1/publicrpc.proto +++ b/proto/publicrpc/v1/publicrpc.proto @@ -19,7 +19,7 @@ enum EmitterChain { message MessageID { // Emitter chain ID. EmitterChain emitter_chain = 1; - // Hex-encoded emitter address. + // Hex-encoded (without leading 0x) emitter address. string emitter_address = 2; // Sequence number for (emitter_chain, emitter_address). int64 sequence = 3; @@ -67,6 +67,20 @@ message GetLastHeartbeatRequest { } message GetLastHeartbeatResponse { - // Mapping of hex-encoded guardian addresses to raw heartbeat messages. - map raw_heartbeats = 1; + message Entry { + // Verified, hex-encoded (with leading 0x) guardian address. This is the guardian address + // which signed this heartbeat. The GuardianAddr field inside the heartbeat + // is NOT verified - remote nodes can put arbitrary data in it. + string verified_guardian_addr = 1; + + // Base58-encoded libp2p node address that sent this heartbeat, used to + // distinguish between multiple nodes running for the same guardian. + string p2p_node_addr = 2; + + // Raw heartbeat received from the network. Data is only as trusted + // as the guardian node that sent it - none of the fields are verified. + gossip.v1.Heartbeat raw_heartbeat = 3; + } + + repeated Entry entries = 1; }