node: re-observe backfilled VAAs

commit-id:8a9af802
This commit is contained in:
Leo 2021-12-19 17:27:06 +01:00 committed by Leopold Schabel
parent 6260d5a148
commit 2a7c32ca3f
2 changed files with 52 additions and 9 deletions

View File

@ -2,10 +2,13 @@ package guardiand
import (
"context"
"encoding/base64"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"github.com/certusone/wormhole/node/pkg/db"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
publicrpcv1 "github.com/certusone/wormhole/node/pkg/proto/publicrpc/v1"
"github.com/certusone/wormhole/node/pkg/publicrpc"
ethcommon "github.com/ethereum/go-ethereum/common"
@ -27,9 +30,10 @@ import (
type nodePrivilegedService struct {
nodev1.UnimplementedNodePrivilegedServiceServer
db *db.Database
injectC chan<- *vaa.VAA
logger *zap.Logger
db *db.Database
injectC chan<- *vaa.VAA
logger *zap.Logger
signedInC chan *gossipv1.SignedVAAWithQuorum
}
// adminGuardianSetUpdateToVAA converts a nodev1.GuardianSetUpdate message to its canonical VAA representation.
@ -200,7 +204,7 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
}
// fetchMissing attempts to backfill a gap by fetching missing signed VAAs from the network.
// fetchMissing attempts to backfill a gap by fetching and storing missing signed VAAs from the network.
// Returns true if the gap was filled, false otherwise.
func (s *nodePrivilegedService) fetchMissing(
ctx context.Context,
@ -242,12 +246,50 @@ func (s *nodePrivilegedService) fetchMissing(
resp.Body.Close()
continue
case http.StatusOK:
type getVaaResp struct {
VaaBytes string `json:"vaaBytes"`
}
var respBody getVaaResp
if err := json.NewDecoder(resp.Body).Decode(&respBody); err != nil {
resp.Body.Close()
s.logger.Warn("failed to decode VAA response",
zap.String("node", node),
zap.String("chain", chain.String()),
zap.String("address", addr),
zap.Uint64("sequence", seq),
zap.Error(err),
)
continue
}
// base64 decode the VAA bytes
vaaBytes, err := base64.StdEncoding.DecodeString(respBody.VaaBytes)
if err != nil {
resp.Body.Close()
s.logger.Warn("failed to decode VAA body",
zap.String("node", node),
zap.String("chain", chain.String()),
zap.String("address", addr),
zap.Uint64("sequence", seq),
zap.Error(err),
)
continue
}
s.logger.Info("backfilled VAA",
zap.Uint16("chain", uint16(chain)),
zap.String("address", addr),
zap.Uint64("sequence", seq),
zap.Int("numBytes", len(vaaBytes)),
)
// Inject into the gossip signed VAA receive path.
// This has the same effect as if the VAA was received from the network
// (verifying signature, publishing to BigTable, storing in local DB...).
s.signedInC <- &gossipv1.SignedVAAWithQuorum{
Vaa: vaaBytes,
}
resp.Body.Close()
return true, nil
default:
@ -300,7 +342,7 @@ func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *no
}, nil
}
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<- *vaa.VAA, signedInC chan *gossipv1.SignedVAAWithQuorum, db *db.Database, gst *common.GuardianSetState) (supervisor.Runnable, error) {
// Delete existing UNIX socket, if present.
fi, err := os.Stat(socketPath)
if err == nil {
@ -333,9 +375,10 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
logger.Info("admin server listening on", zap.String("path", socketPath))
nodeService := &nodePrivilegedService{
injectC: injectC,
db: db,
logger: logger.Named("adminservice"),
injectC: injectC,
db: db,
logger: logger.Named("adminservice"),
signedInC: signedInC,
}
publicrpcService := publicrpc.NewPublicrpcServer(logger, db, gst)

View File

@ -513,7 +513,7 @@ func runNode(cmd *cobra.Command, args []string) {
}
// local admin service socket
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, db, gst)
adminService, err := adminServiceRunnable(logger, *adminSocketPath, injectC, signedInC, db, gst)
if err != nil {
logger.Fatal("failed to create admin service socket", zap.Error(err))
}