node: add support for backfilling via RPC
commit-id:5cad9f7b
This commit is contained in:
parent
8e695c674a
commit
6260d5a148
|
@ -24,6 +24,7 @@ import (
|
||||||
|
|
||||||
var (
|
var (
|
||||||
clientSocketPath *string
|
clientSocketPath *string
|
||||||
|
shouldBackfill *bool
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -35,6 +36,9 @@ func init() {
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
shouldBackfill = AdminClientFindMissingMessagesCmd.Flags().Bool(
|
||||||
|
"backfill", false, "backfill missing VAAs from public RPC")
|
||||||
|
|
||||||
AdminClientInjectGuardianSetUpdateCmd.Flags().AddFlagSet(pf)
|
AdminClientInjectGuardianSetUpdateCmd.Flags().AddFlagSet(pf)
|
||||||
AdminClientFindMissingMessagesCmd.Flags().AddFlagSet(pf)
|
AdminClientFindMissingMessagesCmd.Flags().AddFlagSet(pf)
|
||||||
AdminClientListNodes.Flags().AddFlagSet(pf)
|
AdminClientListNodes.Flags().AddFlagSet(pf)
|
||||||
|
@ -134,7 +138,7 @@ func runFindMissingMessages(cmd *cobra.Command, args []string) {
|
||||||
}
|
}
|
||||||
emitterAddress := args[1]
|
emitterAddress := args[1]
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
|
||||||
defer cancel()
|
defer cancel()
|
||||||
|
|
||||||
conn, err, c := getAdminClient(ctx, *clientSocketPath)
|
conn, err, c := getAdminClient(ctx, *clientSocketPath)
|
||||||
|
@ -146,6 +150,8 @@ func runFindMissingMessages(cmd *cobra.Command, args []string) {
|
||||||
msg := nodev1.FindMissingMessagesRequest{
|
msg := nodev1.FindMissingMessagesRequest{
|
||||||
EmitterChain: uint32(chainID),
|
EmitterChain: uint32(chainID),
|
||||||
EmitterAddress: emitterAddress,
|
EmitterAddress: emitterAddress,
|
||||||
|
RpcBackfill: *shouldBackfill,
|
||||||
|
BackfillNodes: publicRPCEndpoints,
|
||||||
}
|
}
|
||||||
resp, err := c.FindMissingMessages(ctx, &msg)
|
resp, err := c.FindMissingMessages(ctx, &msg)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
|
|
@ -13,8 +13,11 @@ import (
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"math"
|
"math"
|
||||||
|
"math/rand"
|
||||||
"net"
|
"net"
|
||||||
|
"net/http"
|
||||||
"os"
|
"os"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/certusone/wormhole/node/pkg/common"
|
"github.com/certusone/wormhole/node/pkg/common"
|
||||||
nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
|
nodev1 "github.com/certusone/wormhole/node/pkg/proto/node/v1"
|
||||||
|
@ -197,6 +200,65 @@ func (s *nodePrivilegedService) InjectGovernanceVAA(ctx context.Context, req *no
|
||||||
return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
|
return &nodev1.InjectGovernanceVAAResponse{Digests: digests}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fetchMissing attempts to backfill a gap by fetching missing signed VAAs from the network.
|
||||||
|
// Returns true if the gap was filled, false otherwise.
|
||||||
|
func (s *nodePrivilegedService) fetchMissing(
|
||||||
|
ctx context.Context,
|
||||||
|
nodes []string,
|
||||||
|
c *http.Client,
|
||||||
|
chain vaa.ChainID,
|
||||||
|
addr string,
|
||||||
|
seq uint64) (bool, error) {
|
||||||
|
|
||||||
|
// shuffle the list of public RPC endpoints
|
||||||
|
rand.Shuffle(len(nodes), func(i, j int) {
|
||||||
|
nodes[i], nodes[j] = nodes[j], nodes[i]
|
||||||
|
})
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(ctx, time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
for _, node := range nodes {
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf(
|
||||||
|
"%s/v1/signed_vaa/%d/%s/%d", node, chain, addr, seq), nil)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
s.logger.Warn("failed to fetch missing VAA",
|
||||||
|
zap.String("node", node),
|
||||||
|
zap.String("chain", chain.String()),
|
||||||
|
zap.String("address", addr),
|
||||||
|
zap.Uint64("sequence", seq),
|
||||||
|
zap.Error(err),
|
||||||
|
)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
switch resp.StatusCode {
|
||||||
|
case http.StatusNotFound:
|
||||||
|
resp.Body.Close()
|
||||||
|
continue
|
||||||
|
case http.StatusOK:
|
||||||
|
s.logger.Info("backfilled VAA",
|
||||||
|
zap.Uint16("chain", uint16(chain)),
|
||||||
|
zap.String("address", addr),
|
||||||
|
zap.Uint64("sequence", seq),
|
||||||
|
)
|
||||||
|
|
||||||
|
resp.Body.Close()
|
||||||
|
return true, nil
|
||||||
|
default:
|
||||||
|
resp.Body.Close()
|
||||||
|
return false, fmt.Errorf("unexpected response status: %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
|
func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *nodev1.FindMissingMessagesRequest) (*nodev1.FindMissingMessagesResponse, error) {
|
||||||
b, err := hex.DecodeString(req.EmitterAddress)
|
b, err := hex.DecodeString(req.EmitterAddress)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -213,6 +275,20 @@ func (s *nodePrivilegedService) FindMissingMessages(ctx context.Context, req *no
|
||||||
return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
|
return nil, status.Errorf(codes.Internal, "database operation failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if req.RpcBackfill {
|
||||||
|
c := &http.Client{}
|
||||||
|
unfilled := make([]uint64, 0, len(ids))
|
||||||
|
for _, id := range ids {
|
||||||
|
if ok, err := s.fetchMissing(ctx, req.BackfillNodes, c, vaa.ChainID(req.EmitterChain), emitterAddress.String(), id); err != nil {
|
||||||
|
return nil, status.Errorf(codes.Internal, "failed to backfill VAA: %v", err)
|
||||||
|
} else if ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
unfilled = append(unfilled, id)
|
||||||
|
}
|
||||||
|
ids = unfilled
|
||||||
|
}
|
||||||
|
|
||||||
resp := make([]string, len(ids))
|
resp := make([]string, len(ids))
|
||||||
for i, v := range ids {
|
for i, v := range ids {
|
||||||
resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
|
resp[i] = fmt.Sprintf("%d/%s/%d", req.EmitterChain, emitterAddress, v)
|
||||||
|
|
|
@ -0,0 +1,12 @@
|
||||||
|
package guardiand
|
||||||
|
|
||||||
|
// This list is duplicated a couple times across the codebase - make to to update all copies!
|
||||||
|
|
||||||
|
var publicRPCEndpoints = []string{
|
||||||
|
"https://wormhole-v2-mainnet-api.certus.one",
|
||||||
|
"https://wormhole.inotel.ro",
|
||||||
|
"https://wormhole-v2-mainnet-api.mcf.rocks",
|
||||||
|
"https://wormhole-v2-mainnet-api.chainlayer.network",
|
||||||
|
"https://wormhole-v2-mainnet-api.staking.fund",
|
||||||
|
"https://wormhole-v2-mainnet.01node.com",
|
||||||
|
}
|
|
@ -119,6 +119,10 @@ message FindMissingMessagesRequest {
|
||||||
uint32 emitter_chain = 1;
|
uint32 emitter_chain = 1;
|
||||||
// Hex-encoded (without leading 0x) emitter address to iterate.
|
// Hex-encoded (without leading 0x) emitter address to iterate.
|
||||||
string emitter_address = 2;
|
string emitter_address = 2;
|
||||||
|
// Whether to attempt to backfill missing messages from a list of remote nodes.
|
||||||
|
bool rpc_backfill = 3;
|
||||||
|
// List of remote nodes to backfill from.
|
||||||
|
repeated string backfill_nodes = 4;
|
||||||
}
|
}
|
||||||
|
|
||||||
message FindMissingMessagesResponse {
|
message FindMissingMessagesResponse {
|
||||||
|
|
Loading…
Reference in New Issue