From 723cf5fe955a4158e53c2fb0b9898d6dee2a0312 Mon Sep 17 00:00:00 2001 From: Leo Date: Sat, 31 Jul 2021 02:25:22 +0200 Subject: [PATCH] node: add GetSignedVAA endpoint Works: $ curl 'http://localhost:7071/v1/signed_vaa/1/1268b2bf4a[...]/0' {"vaaBytes":"AQAAAAABACbK50nrmgWPtTmRlYf/[...]"} Bug: certusone/wormhole#282 Change-Id: I09eade00c4649c550f06a2efe350d6d9ff9da3ae --- bridge/cmd/guardiand/adminserver.go | 5 +-- bridge/cmd/guardiand/bridge.go | 4 +-- bridge/cmd/guardiand/publicrpc.go | 5 +-- bridge/pkg/db/db.go | 9 +++++- bridge/pkg/processor/observation.go | 3 +- bridge/pkg/publicrpc/publicrpcserver.go | 42 ++++++++++++++++++++++++- bridge/pkg/vaa/structs.go | 5 +++ proto/publicrpc/v1/publicrpc.proto | 32 +++++++++++++++++++ 8 files changed, 96 insertions(+), 9 deletions(-) diff --git a/bridge/cmd/guardiand/adminserver.go b/bridge/cmd/guardiand/adminserver.go index 114f3af41..997831e60 100644 --- a/bridge/cmd/guardiand/adminserver.go +++ b/bridge/cmd/guardiand/adminserver.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "github.com/certusone/wormhole/bridge/pkg/db" publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/bridge/pkg/publicrpc" "math" @@ -131,7 +132,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no return &nodev1.InjectGovernanceVAAResponse{Digest: digest.Bytes()}, nil } -func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns) (supervisor.Runnable, error) { +func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) { // Delete existing UNIX socket, if present. fi, err := os.Stat(socketPath) if err == nil { @@ -165,7 +166,7 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- logger: logger.Named("adminservice"), } - publicrpcService := publicrpc.NewPublicrpcServer(logger, hl) + publicrpcService := publicrpc.NewPublicrpcServer(logger, hl, db) grpcServer := grpc.NewServer() nodev1.RegisterNodePrivilegedServer(grpcServer, nodeService) diff --git a/bridge/cmd/guardiand/bridge.go b/bridge/cmd/guardiand/bridge.go index ba387ffe7..2d954ef95 100644 --- a/bridge/cmd/guardiand/bridge.go +++ b/bridge/cmd/guardiand/bridge.go @@ -385,13 +385,13 @@ func runBridge(cmd *cobra.Command, args []string) { // subscriber channel multiplexing for public gPRC streams rawHeartbeatListeners := publicrpc.HeartbeatStreamMultiplexer(logger) - publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners) + publicrpcService, err := publicrpcServiceRunnable(logger, *publicRPC, rawHeartbeatListeners, db) if err != nil { log.Fatal("failed to create publicrpc service socket", zap.Error(err)) } // local admin service socket - adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners) + adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, rawHeartbeatListeners, db) if err != nil { logger.Fatal("failed to create admin service socket", zap.Error(err)) } diff --git a/bridge/cmd/guardiand/publicrpc.go b/bridge/cmd/guardiand/publicrpc.go index 93ef09e5d..6abf723a9 100644 --- a/bridge/cmd/guardiand/publicrpc.go +++ b/bridge/cmd/guardiand/publicrpc.go @@ -2,6 +2,7 @@ package guardiand import ( "fmt" + "github.com/certusone/wormhole/bridge/pkg/db" publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/bridge/pkg/publicrpc" "github.com/certusone/wormhole/bridge/pkg/supervisor" @@ -10,7 +11,7 @@ import ( "net" ) -func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns) (supervisor.Runnable, error) { +func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicrpc.RawHeartbeatConns, db *db.Database) (supervisor.Runnable, error) { l, err := net.Listen("tcp", listenAddr) if err != nil { return nil, fmt.Errorf("failed to listen: %w", err) @@ -18,7 +19,7 @@ func publicrpcServiceRunnable(logger *zap.Logger, listenAddr string, hl *publicr logger.Info("publicrpc server listening", zap.String("addr", l.Addr().String())) - rpcServer := publicrpc.NewPublicrpcServer(logger, hl) + rpcServer := publicrpc.NewPublicrpcServer(logger, hl, db) grpcServer := grpc.NewServer() publicrpcv1.RegisterPublicrpcServer(grpcServer, rpcServer) diff --git a/bridge/pkg/db/db.go b/bridge/pkg/db/db.go index eeda266bd..035ca8aa0 100644 --- a/bridge/pkg/db/db.go +++ b/bridge/pkg/db/db.go @@ -54,6 +54,8 @@ func (d *Database) StoreSignedVAA(v *vaa.VAA) error { b, _ := v.Marshal() + // TODO: panic if same VAA is stored with different value + err := d.db.Update(func(txn *badger.Txn) error { if err := txn.Set(vaaIDFromVAA(v).Bytes(), b); err != nil { return err @@ -74,11 +76,16 @@ func (d *Database) GetSignedVAABytes(id VAAID) (b []byte, err error) { if err != nil { return err } - if _, err := item.ValueCopy(b); err != nil { + if val, err := item.ValueCopy(nil); err != nil { return err + } else { + b = val } return nil }); err != nil { + if err == badger.ErrKeyNotFound { + return nil, ErrVAANotFound + } return nil, err } return diff --git a/bridge/pkg/processor/observation.go b/bridge/pkg/processor/observation.go index 296da9a9c..1071efd8e 100644 --- a/bridge/pkg/processor/observation.go +++ b/bridge/pkg/processor/observation.go @@ -230,7 +230,8 @@ func (p *Processor) handleObservation(ctx context.Context, m *gossipv1.SignedObs p.logger.Info("signed VAA with quorum", zap.String("digest", hash), zap.Any("vaa", signed), - zap.String("bytes", hex.EncodeToString(vaaBytes))) + zap.String("bytes", hex.EncodeToString(vaaBytes)), + zap.String("message_id", signed.MessageID())) if err := p.db.StoreSignedVAA(signed); err != nil { p.logger.Error("failed to store signed VAA", zap.Error(err)) diff --git a/bridge/pkg/publicrpc/publicrpcserver.go b/bridge/pkg/publicrpc/publicrpcserver.go index 6692f019e..2d15889b5 100644 --- a/bridge/pkg/publicrpc/publicrpcserver.go +++ b/bridge/pkg/publicrpc/publicrpcserver.go @@ -1,9 +1,16 @@ package publicrpc import ( + "context" + "encoding/hex" + "fmt" + "github.com/certusone/wormhole/bridge/pkg/db" gossipv1 "github.com/certusone/wormhole/bridge/pkg/proto/gossip/v1" publicrpcv1 "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1" + "github.com/certusone/wormhole/bridge/pkg/vaa" "go.uber.org/zap" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" ) // PublicrpcServer implements the publicrpc gRPC service. @@ -11,12 +18,14 @@ type PublicrpcServer struct { publicrpcv1.UnimplementedPublicrpcServer rawHeartbeatListeners *RawHeartbeatConns logger *zap.Logger + db *db.Database } -func NewPublicrpcServer(logger *zap.Logger, rawHeartbeatListeners *RawHeartbeatConns) *PublicrpcServer { +func NewPublicrpcServer(logger *zap.Logger, rawHeartbeatListeners *RawHeartbeatConns, db *db.Database) *PublicrpcServer { return &PublicrpcServer{ rawHeartbeatListeners: rawHeartbeatListeners, logger: logger.Named("publicrpcserver"), + db: db, } } @@ -40,3 +49,34 @@ func (s *PublicrpcServer) GetRawHeartbeats(req *publicrpcv1.GetRawHeartbeatsRequ } } } + +func (s *PublicrpcServer) GetSignedVAA(ctx context.Context, req *publicrpcv1.GetSignedVAARequest) (*publicrpcv1.GetSignedVAAResponse, error) { + address, err := hex.DecodeString(req.MessageId.EmitterAddress) + if err != nil { + return nil, status.Error(codes.InvalidArgument, fmt.Sprintf("failed to decode address: %v", err)) + } + if len(address) != 32 { + return nil, status.Error(codes.InvalidArgument, "address must be 32 bytes") + } + + addr := vaa.Address{} + copy(addr[:], address) + + b, err := s.db.GetSignedVAABytes(db.VAAID{ + EmitterChain: vaa.ChainID(req.MessageId.EmitterChain.Number()), + EmitterAddress: addr, + Sequence: uint64(req.MessageId.Sequence), + }) + + if err != nil { + if err == db.ErrVAANotFound { + return nil, status.Error(codes.NotFound, err.Error()) + } + s.logger.Error("failed to fetch VAA", zap.Error(err), zap.Any("request", req)) + return nil, status.Error(codes.Internal, "internal server error") + } + + return &publicrpcv1.GetSignedVAAResponse{ + VaaBytes: b, + }, nil +} diff --git a/bridge/pkg/vaa/structs.go b/bridge/pkg/vaa/structs.go index 7bf009aae..c93cc4a47 100644 --- a/bridge/pkg/vaa/structs.go +++ b/bridge/pkg/vaa/structs.go @@ -241,6 +241,11 @@ func (v *VAA) Marshal() ([]byte, error) { return buf.Bytes(), nil } +// MessageID returns a human-readable emitter_chain/emitter_address/sequence tuple. +func (v *VAA) MessageID() string { + return fmt.Sprintf("%d/%s/%d", v.EmitterChain, v.EmitterAddress, v.Sequence) +} + func (v *VAA) serializeBody() ([]byte, error) { buf := new(bytes.Buffer) MustWrite(buf, binary.BigEndian, uint32(v.Timestamp.Unix())) diff --git a/proto/publicrpc/v1/publicrpc.proto b/proto/publicrpc/v1/publicrpc.proto index e89e089fb..5883b1318 100644 --- a/proto/publicrpc/v1/publicrpc.proto +++ b/proto/publicrpc/v1/publicrpc.proto @@ -7,6 +7,24 @@ option go_package = "github.com/certusone/wormhole/bridge/pkg/proto/publicrpc/v1 import "gossip/v1/gossip.proto"; import "google/api/annotations.proto"; +enum EmitterChain { + CHAIN_ID_UNKNOWN = 0; + CHAIN_ID_SOLANA = 1; + CHAIN_ID_ETHEREUM = 2; + CHAIN_ID_TERRA = 3; + CHAIN_ID_BSC = 4; +} + +// MessageID is a VAA's globally unique identifier (see data availability design document). +message MessageID { + // Emitter chain ID. + EmitterChain emitter_chain = 1; + // Hex-encoded emitter address. + string emitter_address = 2; + // Sequence number for (emitter_chain, emitter_address). + int64 sequence = 3; +} + // Publicrpc service exposes endpoints to be consumed externally; GUIs, historical record keeping, etc. service Publicrpc { // GetRawHeartbeats rpc endpoint returns a stream of the p2p heartbeat messages received. @@ -17,7 +35,21 @@ service Publicrpc { get: "/v1/heartbeats:stream_raw" }; }; + + rpc GetSignedVAA (GetSignedVAARequest) returns (GetSignedVAAResponse) { + option (google.api.http) = { + get: "/v1/signed_vaa/{message_id.emitter_chain}/{message_id.emitter_address}/{message_id.sequence}" + }; + } } message GetRawHeartbeatsRequest { } + +message GetSignedVAARequest { + MessageID message_id = 1; +} + +message GetSignedVAAResponse { + bytes vaa_bytes = 1; +}