From 2a7c32ca3fa9601cbeb5d9079bbb19292856a4da Mon Sep 17 00:00:00 2001 From: Leo Date: Sun, 19 Dec 2021 17:27:06 +0100 Subject: [PATCH] node: re-observe backfilled VAAs commit-id:8a9af802 --- node/cmd/guardiand/adminserver.go | 59 ++++++++++++++++++++++++++----- node/cmd/guardiand/node.go | 2 +- 2 files changed, 52 insertions(+), 9 deletions(-) diff --git a/node/cmd/guardiand/adminserver.go b/node/cmd/guardiand/adminserver.go index 8665624e7..1e2b38252 100644 --- a/node/cmd/guardiand/adminserver.go +++ b/node/cmd/guardiand/adminserver.go @@ -2,10 +2,13 @@ package guardiand import ( "context" + "encoding/base64" "encoding/hex" + "encoding/json" "errors" "fmt" "github.com/certusone/wormhole/node/pkg/db" + gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1" "github.com/certusone/wormhole/node/pkg/publicrpc" ethcommon "github.com/ethereum/go-ethereum/common" @@ -27,9 +30,10 @@ import ( type nodePrivilegedService struct { nodev1.UnimplementedNodePrivilegedServiceServer - db *db.Database - injectC chan<- *vaa.VAA - logger *zap.Logger + db *db.Database + injectC chan<- *vaa.VAA + logger *zap.Logger + signedInC chan *gossipv1.SignedVAAWithQuorum } // adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation. @@ -200,7 +204,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil } -// fetchMissing attempts to backfill a gap by fetching missing signed VAAs from the network. +// fetchMissing attempts to backfill a gap by fetching and storing missing signed VAAs from the network. // Returns true if the gap was filled, false otherwise. func (s *nodePrivilegedService) fetchMissing( ctx context.Context, @@ -242,12 +246,50 @@ func (s *nodePrivilegedService) fetchMissing( resp.Body.Close() continue case http.StatusOK: + type getVaaResp struct { + VaaBytes string `json:"vaaBytes"` + } + var respBody getVaaResp + if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil { + resp.Body.Close() + s.logger.Warn("failed to decode VAA response", + zap.String("node", node), + zap.String("chain", chain.String()), + zap.String("address", addr), + zap.Uint64("sequence", seq), + zap.Error(err), + ) + continue + } + + // base64 decode the VAA bytes + vaaBytes, err := base64.StdEncoding.DecodeString(respBody.VaaBytes) + if err != nil { + resp.Body.Close() + s.logger.Warn("failed to decode VAA body", + zap.String("node", node), + zap.String("chain", chain.String()), + zap.String("address", addr), + zap.Uint64("sequence", seq), + zap.Error(err), + ) + continue + } + s.logger.Info("backfilled VAA", zap.Uint16("chain", uint16(chain)), zap.String("address", addr), zap.Uint64("sequence", seq), + zap.Int("numBytes", len(vaaBytes)), ) + // Inject into the gossip signed VAA receive path. + // This has the same effect as if the VAA was received from the network + // (verifying signature, publishing to BigTable, storing in local DB...). + s.signedInC <- &gossipv1.SignedVAAWithQuorum{ + Vaa: vaaBytes, + } + resp.Body.Close() return true, nil default: @@ -300,7 +342,7 @@ func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *no }, nil } -func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { +func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) { // Delete existing UNIX socket, if present. fi, err := os.Stat(socketPath) if err == nil { @@ -333,9 +375,10 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- logger.Info("admin server listening on", zap.String("path", socketPath)) nodeService := &nodePrivilegedService{ - injectC: injectC, - db: db, - logger: logger.Named("adminservice"), + injectC: injectC, + db: db, + logger: logger.Named("adminservice"), + signedInC: signedInC, } publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index 973a9aa31..282941a8d 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -513,7 +513,7 @@ func runNode(cmd *cobra.Command, args []string) { } // local admin service socket - adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, db, gst) + adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, db, gst) if err != nil { logger.Fatal("failed to create admin service socket", zap.Error(err)) }