Node: Gateway Relayer (#3243)

This commit is contained in:
bruce-riley 2023-08-03 10:26:50 -05:00 committed by GitHub
parent e5329e7ca4
commit 1721fef95a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
11 changed files with 446 additions and 15 deletions

View File

@ -207,6 +207,10 @@ var (
bigTableKeyPath *string
chainGovernorEnabled *bool
gatewayRelayerContract *string
gatewayRelayerKeyPath *string
gatewayRelayerKeyPassPhrase *string
)
func init() {
@ -367,6 +371,10 @@ func init() {
bigTableKeyPath = NodeCmd.Flags().String("bigTableKeyPath", "", "Path to json Service Account key")
chainGovernorEnabled = NodeCmd.Flags().Bool("chainGovernorEnabled", false, "Run the chain governor")
gatewayRelayerContract = NodeCmd.Flags().String("gatewayRelayerContract", "", "Address of the smart contract on wormchain to receive relayed VAAs")
gatewayRelayerKeyPath = NodeCmd.Flags().String("gatewayRelayerKeyPath", "", "Path to gateway relayer private key for signing transactions")
gatewayRelayerKeyPassPhrase = NodeCmd.Flags().String("gatewayRelayerKeyPassPhrase", "", "Pass phrase used to unarmor the gateway relayer key file")
}
var (
@ -1003,6 +1011,40 @@ func runNode(cmd *cobra.Command, args []string) {
}
}
var gatewayRelayerWormchainConn *wormconn.ClientConn
if *gatewayRelayerContract != "" {
if *wormchainURL == "" {
logger.Fatal("if gatewayRelayerContract is specified, wormchainURL is required", zap.String("component", "gwrelayer"))
}
if *gatewayRelayerKeyPath == "" {
logger.Fatal("if gatewayRelayerContract is specified, gatewayRelayerKeyPath is required", zap.String("component", "gwrelayer"))
}
if *gatewayRelayerKeyPassPhrase == "" {
logger.Fatal("if gatewayRelayerContract is specified, gatewayRelayerKeyPassPhrase is required", zap.String("component", "gwrelayer"))
}
wormchainKeyPathName := *gatewayRelayerKeyPath
if *unsafeDevMode {
idx, err := devnet.GetDevnetIndex()
if err != nil {
logger.Fatal("failed to get devnet index", zap.Error(err), zap.String("component", "gwrelayer"))
}
wormchainKeyPathName = fmt.Sprint(*gatewayRelayerKeyPath, idx)
}
wormchainKey, err = wormconn.LoadWormchainPrivKey(wormchainKeyPathName, *gatewayRelayerKeyPassPhrase)
if err != nil {
logger.Fatal("failed to load private key", zap.Error(err), zap.String("component", "gwrelayer"))
}
logger.Info("Connecting to wormchain", zap.String("wormchainURL", *wormchainURL), zap.String("gatewayRelayerKeyPath", wormchainKeyPathName), zap.String("component", "gwrelayer"))
gatewayRelayerWormchainConn, err = wormconn.NewConn(rootCtx, *wormchainURL, wormchainKey)
if err != nil {
logger.Fatal("failed to connect to wormchain", zap.Error(err), zap.String("component", "gwrelayer"))
}
}
var watcherConfigs = []watchers.WatcherConfig{}
if shouldStart(ethRPC) {
@ -1362,6 +1404,7 @@ func runNode(cmd *cobra.Command, args []string) {
node.GuardianOptionWatchers(watcherConfigs, ibcWatcherConfig),
node.GuardianOptionAccountant(*accountantContract, *accountantWS, *accountantCheckEnabled, wormchainConn),
node.GuardianOptionGovernor(*chainGovernorEnabled),
node.GuardianOptionGatewayRelayer(*gatewayRelayerContract, gatewayRelayerWormchainConn),
node.GuardianOptionAdminService(*adminSocketPath, ethRPC, ethContract, rpcMap),
node.GuardianOptionP2P(p2pKey, *p2pNetworkID, *p2pBootstrap, *nodeName, *disableHeartbeatVerify, *p2pPort, ibc.GetFeatures),
node.GuardianOptionStatusServer(*statusAddr),

View File

@ -354,7 +354,8 @@ func runSpy(cmd *cobra.Command, args []string) {
nil,
nil,
components,
nil, // ibc feature string
nil, // ibc feature string
false, // gateway relayer enabled
)); err != nil {
return err
}

View File

@ -0,0 +1,274 @@
// The GatewayRelayer manages the interface to the ibcTranslator smart contract on wormchain. It is called when a signed VAA with quorum gets published.
// It forwards all payload three VAAs destined for the ibcTranslator contract on wormchain to that contract.
package gwrelayer
import (
"context"
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"time"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/supervisor"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
wasmdtypes "github.com/CosmWasm/wasmd/x/wasm/types"
sdktypes "github.com/cosmos/cosmos-sdk/types"
sdktx "github.com/cosmos/cosmos-sdk/types/tx"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"go.uber.org/zap"
)
type (
GatewayRelayerWormchainConn interface {
Close()
SenderAddress() string
SubmitQuery(ctx context.Context, contractAddress string, query []byte) ([]byte, error)
SignAndBroadcastTx(ctx context.Context, msg sdktypes.Msg) (*sdktx.BroadcastTxResponse, error)
BroadcastTxResponseToString(txResp *sdktx.BroadcastTxResponse) string
}
)
// GatewayRelayer is the object that manages the interface to the wormchain accountant smart contract.
type GatewayRelayer struct {
ctx context.Context
logger *zap.Logger
contractAddress string
wormchainConn GatewayRelayerWormchainConn
env common.Environment
subChan chan *vaa.VAA
targetAddress vaa.Address
}
// subChanSize is the capacity of the submit channel used to publish VAAs.
const subChanSize = 50
var (
vaasSubmitted = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gwrelayer_vaas_submitted",
Help: "Total number of VAAs submitted to the gateway relayer",
})
channelFullErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gwrelayer_channel_full_errors",
Help: "Total number of VAAs dropped because the gateway relayer channel was full",
})
submitErrors = promauto.NewCounter(
prometheus.CounterOpts{
Name: "gwrelayer_submit_errors",
Help: "Total number of errors encountered while submitting VAAs to the gateway relayer",
})
)
// NewGatewayRelayer creates a new instance of the GatewayRelayer object.
func NewGatewayRelayer(
ctx context.Context,
logger *zap.Logger,
contractAddress string,
wormchainConn GatewayRelayerWormchainConn,
env common.Environment,
) *GatewayRelayer {
if contractAddress == "" {
return nil // This is not an error, it just means the feature is not enabled.
}
return &GatewayRelayer{
ctx: ctx,
logger: logger.With(zap.String("component", "gwrelayer")),
contractAddress: contractAddress,
wormchainConn: wormchainConn,
env: env,
subChan: make(chan *vaa.VAA, subChanSize),
}
}
// Start initializes the gateway relayer and starts the worker runnable that submits VAAs to the contract.
func (gwr *GatewayRelayer) Start(ctx context.Context) error {
var err error
gwr.targetAddress, err = convertBech32AddressToWormhole(gwr.contractAddress)
if err != nil {
return err
}
gwr.logger.Info("starting gateway relayer", zap.String("contract", gwr.contractAddress), zap.String("targetAddress", hex.EncodeToString(gwr.targetAddress.Bytes())))
// Start the watcher to listen to transfer events from the smart contract.
if gwr.env == common.GoTest {
// We're not in a runnable context, so we can't use supervisor.
go func() {
_ = gwr.worker(ctx)
}()
} else {
if err := supervisor.Run(ctx, "gwrworker", common.WrapWithScissors(gwr.worker, "gwrworker")); err != nil {
return fmt.Errorf("failed to start submit vaa worker: %w", err)
}
}
return nil
}
// Close closes the connection to the smart contract.
func (gwr *GatewayRelayer) Close() {
if gwr.wormchainConn != nil {
gwr.wormchainConn.Close()
gwr.wormchainConn = nil
}
}
// convertBech32AddressToWormhole converts a bech32 address to a wormhole address.
func convertBech32AddressToWormhole(contractAddress string) (vaa.Address, error) {
addrBytes, err := sdktypes.GetFromBech32(contractAddress, "wormhole")
if err != nil {
return vaa.Address{}, fmt.Errorf(`failed to decode target address "%s": %w`, contractAddress, err)
}
return vaa.Address(addrBytes), nil
}
// SubmitVAA checks to see if the VAA should be submitted to the smart contract, and if so, writes it to the channel for publishing.
func (gwr *GatewayRelayer) SubmitVAA(v *vaa.VAA) {
if shouldPub, err := shouldPublish(v.Payload, vaa.ChainIDWormchain, gwr.targetAddress); err != nil {
gwr.logger.Error("failed to check if vaa should be published", zap.String("msgId", v.MessageID()), zap.Error(err))
return
} else if !shouldPub {
return
}
select {
case gwr.subChan <- v:
gwr.logger.Debug("submitted vaa to channel", zap.String("msgId", v.MessageID()))
default:
channelFullErrors.Inc()
gwr.logger.Error("unable to submit vaa because the channel is full, dropping it", zap.String("msgId", v.MessageID()))
}
}
// shouldPublish returns true if a message should be forwarded to the contract on wormchain, false if not.
func shouldPublish(payload []byte, targetChain vaa.ChainID, targetAddress vaa.Address) (bool, error) {
if len(payload) == 0 {
return false, nil
}
if payload[0] != 3 {
return false, nil
}
hdr, err := vaa.DecodeTransferPayloadHdr(payload)
if err != nil {
return false, fmt.Errorf("failed to decode payload: %w", err)
}
if hdr.TargetChain != targetChain || hdr.TargetAddress != targetAddress {
return false, nil
}
return true, nil
}
// worker listens for VAAs and submits them to the smart contract.
func (gwr *GatewayRelayer) worker(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
case v := <-gwr.subChan:
if err := gwr.submitVAAToContract(v); err != nil {
gwr.logger.Error("failed to submit vaa to contract", zap.String("msgId", v.MessageID()), zap.Error(err))
return fmt.Errorf("failed to submit vaa to contract: %w", err)
}
}
}
}
// submitVAAToContract submits a VAA to the smart contract on wormchain.
func (gwr *GatewayRelayer) submitVAAToContract(v *vaa.VAA) error {
_, err := SubmitVAAToContract(gwr.ctx, gwr.logger, gwr.wormchainConn, gwr.contractAddress, v)
if err != nil {
submitErrors.Inc()
return err
}
// TODO: Need to check txResp for "VAA already submitted", which should not be an error.
vaasSubmitted.Inc()
return nil
}
type (
completeTransferAndConvertMsg struct {
Params completeTransferAndConvertParams `json:"complete_transfer_and_convert"`
}
completeTransferAndConvertParams struct {
VAA []byte `json:"vaa"`
}
)
// SubmitVAAToContract submits a VAA to the smart contract on wormchain.
func SubmitVAAToContract(
ctx context.Context,
logger *zap.Logger,
wormchainConn GatewayRelayerWormchainConn,
contract string,
v *vaa.VAA,
) (*sdktx.BroadcastTxResponse, error) {
logger.Info("submitting VAA to contract", zap.String("message_id", v.MessageID()))
vaaBytes, err := v.Marshal()
if err != nil {
return nil, fmt.Errorf("failed to marshal vaa: %w", err)
}
msgData := completeTransferAndConvertMsg{
Params: completeTransferAndConvertParams{
VAA: vaaBytes,
},
}
msgBytes, err := json.Marshal(msgData)
if err != nil {
return nil, fmt.Errorf("failed to marshal request: %w", err)
}
subMsg := wasmdtypes.MsgExecuteContract{
Sender: wormchainConn.SenderAddress(),
Contract: contract,
Msg: msgBytes,
Funds: sdktypes.Coins{},
}
start := time.Now()
txResp, err := wormchainConn.SignAndBroadcastTx(ctx, &subMsg)
if err != nil {
return txResp, fmt.Errorf("failed to send broadcast: %w", err)
}
if txResp == nil {
return txResp, fmt.Errorf("sent broadcast but returned txResp is nil")
}
if txResp.TxResponse == nil {
return txResp, fmt.Errorf("sent broadcast but returned txResp.TxResponse is nil")
}
if txResp.TxResponse.RawLog == "" {
return txResp, fmt.Errorf("sent broadcast but raw_log is not set, unable to analyze the result")
}
if strings.Contains(txResp.TxResponse.RawLog, "out of gas") {
return txResp, fmt.Errorf("out of gas: %s", txResp.TxResponse.RawLog)
}
if strings.Contains(txResp.TxResponse.RawLog, "failed to execute message") {
return txResp, fmt.Errorf("submit failed: %s", txResp.TxResponse.RawLog)
}
logger.Info("done sending broadcast", zap.String("msgId", v.MessageID()), zap.Int64("gasUsed", txResp.TxResponse.GasUsed), zap.Stringer("elapsedTime", time.Since(start)))
logger.Debug("in SubmitVAAToContract, done sending broadcast", zap.String("resp", wormchainConn.BroadcastTxResponseToString(txResp)))
return txResp, nil
}

View File

@ -0,0 +1,68 @@
package gwrelayer
import (
"bytes"
"encoding/hex"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
)
func Test_convertBech32AddressToWormhole(t *testing.T) {
expectedAddress, err := hex.DecodeString("ade4a5f5803a439835c636395a8d648dee57b2fc90d98dc17fa887159b69638b")
require.NoError(t, err)
// Basic success case.
targetAddress, err := convertBech32AddressToWormhole("wormhole14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9srrg465")
require.NoError(t, err)
assert.Equal(t, true, bytes.Equal(expectedAddress, targetAddress.Bytes()))
// Garbage in should generate an error.
_, err = convertBech32AddressToWormhole("hello world!")
assert.Error(t, err)
// Empty input should generate an error.
_, err = convertBech32AddressToWormhole("")
assert.Error(t, err)
}
func Test_shouldPublish(t *testing.T) {
type Test struct {
label string
payload []byte
result bool
err bool
}
tests := []Test{
{label: "empty payload", payload: []byte{}, result: false, err: false},
{label: "non-transfer", payload: []byte{0x0}, result: false, err: false},
{label: "payload type 1", payload: []byte{0x1}, result: false, err: false},
{label: "payload too short", payload: []byte{0x3, 0x00, 0x00}, result: false, err: true},
{label: "wrong target chain", payload: decodeBytes("0300000000000000000000000000000000000000000000000000000000000000640000000000000000000000005425890298aed601595a70ab815c96711a31bc6500066d9ae6b2d333c1d65301a59da3eed388ca5dc60cb12496584b75cbe6b15fdbed0020000000000000000000000000e6990c7e206d418d62b9e50c8e61f59dc360183b7b2262617369635f726563697069656e74223a7b22726563697069656e74223a22633256704d57786c656d3179636d31336348687865575679626e6c344d33706a595768735a4756715958686e4f485a364f484e774d32526f227d7d"), result: false, err: false},
{label: "wrong target address", payload: decodeBytes("0300000000000000000000000000000000000000000000000000000000000000640000000000000000000000005425890298aed601595a70ab815c96711a31bc6500066d9ae6b2d333c1d65301a59da3eed388ca5dc60cb12496584b75cbe6b15fdbed0C20000000000000000000000000e6990c7e206d418d62b9e50c8e61f59dc360183b7b2262617369635f726563697069656e74223a7b22726563697069656e74223a22633256704d57786c656d3179636d31336348687865575679626e6c344d33706a595768735a4756715958686e4f485a364f484e774d32526f227d7d"), result: false, err: false},
{label: "should publish", payload: decodeBytes("0300000000000000000000000000000000000000000000000000000000000000640000000000000000000000005425890298aed601595a70ab815c96711a31bc650006ade4a5f5803a439835c636395a8d648dee57b2fc90d98dc17fa887159b69638b0C20000000000000000000000000e6990c7e206d418d62b9e50c8e61f59dc360183b7b2262617369635f726563697069656e74223a7b22726563697069656e74223a22633256704d57786c656d3179636d31336348687865575679626e6c344d33706a595768735a4756715958686e4f485a364f484e774d32526f227d7d"), result: true, err: false},
}
targetAddress, err := convertBech32AddressToWormhole("wormhole14hj2tavq8fpesdwxxcu44rty3hh90vhujrvcmstl4zr3txmfvw9srrg465")
require.NoError(t, err)
for _, tc := range tests {
t.Run(string(tc.label), func(t *testing.T) {
result, err := shouldPublish(tc.payload, vaa.ChainIDWormchain, targetAddress)
assert.Equal(t, tc.err, err != nil)
assert.Equal(t, tc.result, result)
})
}
}
func decodeBytes(s string) []byte {
b, err := hex.DecodeString(s)
if err != nil {
panic(err)
}
return b
}

View File

@ -9,6 +9,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/gwrelayer"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"
@ -43,6 +44,7 @@ type G struct {
gst *common.GuardianSetState
acct *accountant.Accountant
gov *governor.ChainGovernor
gatewayRelayer *gwrelayer.GatewayRelayer
attestationEvents *reporter.AttestationEventReporter
publicrpcServer *grpc.Server
@ -175,6 +177,13 @@ func (g *G) Run(rootCtxCancel context.CancelFunc, options ...*GuardianOption) su
}
}
if g.gatewayRelayer != nil {
logger.Info("Starting gateway relayer")
if err := g.gatewayRelayer.Start(ctx); err != nil {
logger.Fatal("failed to start gateway relayer", zap.Error(err), zap.String("component", "gwrelayer"))
}
}
// Start any other runnables
for name, runnable := range g.runnables {
if err := supervisor.Run(ctx, name, runnable); err != nil {

View File

@ -186,6 +186,7 @@ func mockGuardianRunnable(t testing.TB, gs []*mockGuardian, mockGuardianIndex ui
GuardianOptionWatchers(watcherConfigs, nil),
GuardianOptionNoAccountant(), // disable accountant
GuardianOptionGovernor(true),
GuardianOptionGatewayRelayer("", nil), // disable gateway relayer
GuardianOptionP2P(gs[mockGuardianIndex].p2pKey, networkID, bootstrapPeers, nodeName, false, cfg.p2pPort, func() string { return "" }),
GuardianOptionPublicRpcSocket(cfg.publicSocket, publicRpcLogDetail),
GuardianOptionPublicrpcTcpService(cfg.publicRpc, publicRpcLogDetail),

View File

@ -12,6 +12,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/governor"
"github.com/certusone/wormhole/node/pkg/gwrelayer"
"github.com/certusone/wormhole/node/pkg/p2p"
"github.com/certusone/wormhole/node/pkg/processor"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
@ -41,7 +42,7 @@ type GuardianOption struct {
func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId string, bootstrapPeers string, nodeName string, disableHeartbeatVerify bool, port uint, ibcFeaturesFunc func() string) *GuardianOption {
return &GuardianOption{
name: "p2p",
dependencies: []string{"accountant", "governor"},
dependencies: []string{"accountant", "governor", "gateway-relayer"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
components := p2p.DefaultComponents()
components.Port = port
@ -71,6 +72,7 @@ func GuardianOptionP2P(p2pKey libp2p_crypto.PrivKey, networkId string, bootstrap
nil,
components,
ibcFeaturesFunc,
(g.gatewayRelayer != nil),
)
return nil
@ -152,6 +154,25 @@ func GuardianOptionGovernor(governorEnabled bool) *GuardianOption {
}}
}
// GuardianOptionGatewayRelayer configures the Gateway Relayer module. If the gateway relayer smart contract is configured, we will instantiate
// the GatewayRelayer and signed VAAs will be passed to it for processing when they are published. It will forward payload three transfers destined
// for the specified contract on wormchain to that contract.
func GuardianOptionGatewayRelayer(gatewayRelayerContract string, wormchainConn *wormconn.ClientConn) *GuardianOption {
return &GuardianOption{
name: "gateway-relayer",
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
g.gatewayRelayer = gwrelayer.NewGatewayRelayer(
ctx,
logger,
gatewayRelayerContract,
wormchainConn,
g.env,
)
return nil
}}
}
// GuardianOptionStatusServer configures the status server, including /readyz and /metrics.
// If g.env == common.UnsafeDevNet || g.env == common.GoTest, pprof will be enabled under /debug/pprof/
// Dependencies: none
@ -450,7 +471,7 @@ func GuardianOptionProcessor() *GuardianOption {
return &GuardianOption{
name: "processor",
// governor and accountant may be set to nil, but that choice needs to be made before the processor is configured
dependencies: []string{"db", "governor", "accountant"},
dependencies: []string{"db", "governor", "accountant", "gateway-relayer"},
f: func(ctx context.Context, logger *zap.Logger, g *G) error {
@ -469,6 +490,7 @@ func GuardianOptionProcessor() *GuardianOption {
g.gov,
g.acct,
g.acctC.readC,
g.gatewayRelayer,
).Run
return nil

View File

@ -203,6 +203,7 @@ func Run(
signedGovSt chan *gossipv1.SignedChainGovernorStatus,
components *Components,
ibcFeaturesFunc func() string,
gatewayRelayerEnabled bool,
) func(ctx context.Context) error {
if components == nil {
components = DefaultComponents()
@ -379,6 +380,9 @@ func Run(
features = append(features, ibcFlags)
}
}
if gatewayRelayerEnabled {
features = append(features, "gwrelayer")
}
heartbeat := &gossipv1.Heartbeat{
NodeName: nodeName,

View File

@ -183,6 +183,7 @@ func startGuardian(t *testing.T, ctx context.Context, g *G) {
g.signedGovCfg,
g.signedGovSt,
g.components,
nil, // ibc feature string
nil, // ibc feature string
false, // gateway relayer enabled
))
}

View File

@ -87,4 +87,8 @@ func (p *Processor) broadcastSignedVAA(v *vaa.VAA) {
}
p.gossipSendC <- msg
if p.gatewayRelayer != nil {
p.gatewayRelayer.SubmitVAA(v)
}
}

View File

@ -15,6 +15,7 @@ import (
"github.com/certusone/wormhole/node/pkg/accountant"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/gwrelayer"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/reporter"
"github.com/certusone/wormhole/node/pkg/supervisor"
@ -126,10 +127,11 @@ type Processor struct {
// gk pk as eth address
ourAddr ethcommon.Address
governor *governor.ChainGovernor
acct *accountant.Accountant
acctReadC <-chan *common.MessagePublication
pythnetVaas map[string]PythNetVaaEntry
governor *governor.ChainGovernor
acct *accountant.Accountant
acctReadC <-chan *common.MessagePublication
pythnetVaas map[string]PythNetVaaEntry
gatewayRelayer *gwrelayer.GatewayRelayer
}
var (
@ -164,6 +166,7 @@ func NewProcessor(
g *governor.ChainGovernor,
acct *accountant.Accountant,
acctReadC <-chan *common.MessagePublication,
gatewayRelayer *gwrelayer.GatewayRelayer,
) *Processor {
return &Processor{
@ -180,13 +183,14 @@ func NewProcessor(
attestationEvents: attestationEvents,
logger: supervisor.Logger(ctx),
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
governor: g,
acct: acct,
acctReadC: acctReadC,
pythnetVaas: make(map[string]PythNetVaaEntry),
logger: supervisor.Logger(ctx),
state: &aggregationState{observationMap{}},
ourAddr: crypto.PubkeyToAddress(gk.PublicKey),
governor: g,
acct: acct,
acctReadC: acctReadC,
pythnetVaas: make(map[string]PythNetVaaEntry),
gatewayRelayer: gatewayRelayer,
}
}