2022-09-07 11:43:05 -07:00
package main
import (
"context"
2022-11-21 06:18:33 -08:00
"flag"
2023-07-24 07:24:39 -07:00
"log"
2023-06-26 08:47:22 -07:00
"strconv"
2023-02-01 04:20:10 -08:00
"strings"
2023-08-07 12:05:08 -07:00
"time"
2022-11-08 11:03:43 -08:00
2022-09-07 11:43:05 -07:00
"fmt"
2022-11-08 06:53:12 -08:00
"os"
2023-05-10 11:49:40 -07:00
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
2023-01-05 11:40:24 -08:00
"github.com/go-redis/redis/v8"
2023-06-22 14:59:23 -07:00
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
2023-08-07 12:05:08 -07:00
"github.com/wormhole-foundation/wormhole-explorer/common/dbutil"
2023-03-06 09:36:40 -08:00
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
2023-03-23 11:36:50 -07:00
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
2023-01-31 06:38:17 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/config"
2022-11-21 06:18:33 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/deduplicator"
2022-11-16 10:48:16 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/guardiansets"
2023-06-22 14:59:23 -07:00
flyAlert "github.com/wormhole-foundation/wormhole-explorer/fly/internal/alert"
2023-02-13 12:28:34 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/health"
2023-06-26 08:47:22 -07:00
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/metrics"
2022-11-21 06:18:33 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs"
2022-11-16 10:48:16 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/migration"
2023-01-05 11:40:24 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/notifier"
2022-11-21 06:18:33 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/processor"
2023-10-18 07:18:32 -07:00
"github.com/wormhole-foundation/wormhole-explorer/fly/producer"
2022-11-21 06:18:33 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/queue"
2022-12-05 12:41:37 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/server"
2022-11-16 10:48:16 -08:00
"github.com/wormhole-foundation/wormhole-explorer/fly/storage"
2023-06-26 08:47:22 -07:00
"google.golang.org/protobuf/proto"
2022-11-16 10:48:16 -08:00
2022-09-07 11:43:05 -07:00
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/p2p"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/supervisor"
2022-11-21 06:18:33 -08:00
"github.com/dgraph-io/ristretto"
"github.com/eko/gocache/v3/cache"
"github.com/eko/gocache/v3/store"
2022-09-07 11:43:05 -07:00
eth_common "github.com/ethereum/go-ethereum/common"
2022-10-22 17:58:38 -07:00
crypto2 "github.com/ethereum/go-ethereum/crypto"
2022-09-07 11:43:05 -07:00
"github.com/libp2p/go-libp2p-core/crypto"
2022-11-08 06:53:12 -08:00
"github.com/wormhole-foundation/wormhole/sdk/vaa"
2022-09-07 11:43:05 -07:00
"go.uber.org/zap"
"github.com/joho/godotenv"
)
var (
rootCtx context . Context
rootCtxCancel context . CancelFunc
)
var (
2023-01-30 10:51:13 -08:00
nodeKeyPath string
logLevel string
2022-09-07 11:43:05 -07:00
)
2022-11-21 06:18:33 -08:00
func getenv ( key string ) ( string , error ) {
v := os . Getenv ( key )
if v == "" {
return "" , fmt . Errorf ( "[%s] env is required" , key )
}
return v , nil
}
// TODO refactor to another file/package
2023-05-10 11:49:40 -07:00
func newAwsConfig ( ctx context . Context ) ( aws . Config , error ) {
2022-11-21 06:18:33 -08:00
region , err := getenv ( "AWS_REGION" )
if err != nil {
2023-05-10 11:49:40 -07:00
return * aws . NewConfig ( ) , err
2022-11-21 06:18:33 -08:00
}
awsSecretId , _ := getenv ( "AWS_ACCESS_KEY_ID" )
awsSecretKey , _ := getenv ( "AWS_SECRET_ACCESS_KEY" )
if awsSecretId != "" && awsSecretKey != "" {
2023-05-10 11:49:40 -07:00
credentials := credentials . NewStaticCredentialsProvider ( awsSecretId , awsSecretKey , "" )
customResolver := aws . EndpointResolverFunc ( func ( service , region string ) ( aws . Endpoint , error ) {
awsEndpoint , _ := getenv ( "AWS_ENDPOINT" )
if awsEndpoint != "" {
return aws . Endpoint {
PartitionID : "aws" ,
URL : awsEndpoint ,
SigningRegion : region ,
} , nil
}
return aws . Endpoint { } , & aws . EndpointNotFoundError { }
} )
awsCfg , err := awsconfig . LoadDefaultConfig ( ctx ,
awsconfig . WithRegion ( region ) ,
awsconfig . WithEndpointResolver ( customResolver ) ,
awsconfig . WithCredentialsProvider ( credentials ) ,
)
return awsCfg , err
2022-11-21 06:18:33 -08:00
}
2023-05-10 11:49:40 -07:00
return awsconfig . LoadDefaultConfig ( ctx , awsconfig . WithRegion ( region ) )
2022-11-21 06:18:33 -08:00
}
// TODO refactor to another file/package
2023-05-10 11:49:40 -07:00
func newSQSProducer ( ctx context . Context ) ( * sqs . Producer , error ) {
2022-11-21 06:18:33 -08:00
sqsURL , err := getenv ( "SQS_URL" )
if err != nil {
return nil , err
}
2023-05-10 11:49:40 -07:00
awsConfig , err := newAwsConfig ( ctx )
2022-11-21 06:18:33 -08:00
if err != nil {
return nil , err
}
2023-05-10 11:49:40 -07:00
return sqs . NewProducer ( awsConfig , sqsURL )
2022-11-21 06:18:33 -08:00
}
// TODO refactor to another file/package
2023-05-10 11:49:40 -07:00
func newSQSConsumer ( ctx context . Context ) ( * sqs . Consumer , error ) {
2022-11-21 06:18:33 -08:00
sqsURL , err := getenv ( "SQS_URL" )
if err != nil {
return nil , err
}
2023-05-10 11:49:40 -07:00
awsConfig , err := newAwsConfig ( ctx )
2022-11-21 06:18:33 -08:00
if err != nil {
return nil , err
}
2023-05-10 11:49:40 -07:00
return sqs . NewConsumer ( awsConfig , sqsURL ,
2022-11-21 06:18:33 -08:00
sqs . WithMaxMessages ( 10 ) ,
sqs . WithVisibilityTimeout ( 120 ) )
}
// TODO refactor to another file/package
func newCache ( ) ( cache . CacheInterface [ bool ] , error ) {
c , err := ristretto . NewCache ( & ristretto . Config {
NumCounters : 10000 , // Num keys to track frequency of (1000).
MaxCost : 10 * ( 1 << 20 ) , // Maximum cost of cache (10 MB).
BufferItems : 64 , // Number of keys per Get buffer.
} )
if err != nil {
return nil , err
}
store := store . NewRistretto ( c )
return cache . New [ bool ] ( store ) , nil
}
// Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue)
// callback to obtain queue messages from a queue
// callback to publish vaa non pyth messages to a sink
2023-05-10 11:49:40 -07:00
func newVAAConsumePublish ( ctx context . Context , isLocal bool , logger * zap . Logger ) ( * sqs . Consumer , processor . VAAQueueConsumeFunc , processor . VAAPushFunc ) {
2022-11-21 06:18:33 -08:00
if isLocal {
vaaQueue := queue . NewVAAInMemory ( )
2022-12-05 12:41:37 -08:00
return nil , vaaQueue . Consume , vaaQueue . Publish
2022-11-21 06:18:33 -08:00
}
2023-05-10 11:49:40 -07:00
sqsProducer , err := newSQSProducer ( ctx )
2022-11-21 06:18:33 -08:00
if err != nil {
logger . Fatal ( "could not create sqs producer" , zap . Error ( err ) )
}
2023-05-10 11:49:40 -07:00
sqsConsumer , err := newSQSConsumer ( ctx )
2022-11-21 06:18:33 -08:00
if err != nil {
logger . Fatal ( "could not create sqs consumer" , zap . Error ( err ) )
}
vaaQueue := queue . NewVAASQS ( sqsProducer , sqsConsumer , logger )
2022-12-05 12:41:37 -08:00
return sqsConsumer , vaaQueue . Consume , vaaQueue . Publish
2022-11-21 06:18:33 -08:00
}
2023-01-05 11:40:24 -08:00
func newVAANotifierFunc ( isLocal bool , logger * zap . Logger ) processor . VAANotifyFunc {
if isLocal {
return func ( context . Context , * vaa . VAA , [ ] byte ) error {
return nil
}
}
redisUri , err := getenv ( "REDIS_URI" )
if err != nil {
logger . Fatal ( "could not create vaa notifier " , zap . Error ( err ) )
}
2023-07-03 11:10:52 -07:00
redisPrefix , err := getenv ( "REDIS_PREFIX" )
if err != nil {
logger . Fatal ( "could not create vaa notifier " , zap . Error ( err ) )
}
logger . Info ( "using redis notifier" , zap . String ( "prefix" , redisPrefix ) )
2023-01-05 11:40:24 -08:00
client := redis . NewClient ( & redis . Options { Addr : redisUri } )
2023-07-03 11:10:52 -07:00
return notifier . NewLastSequenceNotifier ( client , redisPrefix ) . Notify
2023-01-05 11:40:24 -08:00
}
2023-06-22 14:59:23 -07:00
func newAlertClient ( ) ( alert . AlertClient , error ) {
alertConfig , err := config . GetAlertConfig ( )
if err != nil {
return nil , err
}
if ! alertConfig . Enabled {
return alert . NewDummyClient ( ) , nil
}
return alert . NewAlertService ( alertConfig , flyAlert . LoadAlerts )
}
2023-07-03 11:10:52 -07:00
func newMetrics ( enviroment string ) metrics . Metrics {
2023-06-28 09:07:45 -07:00
metricsEnabled := config . GetMetricsEnabled ( )
if ! metricsEnabled {
2023-06-26 08:47:22 -07:00
return metrics . NewDummyMetrics ( )
}
2023-07-03 11:10:52 -07:00
return metrics . NewPrometheusMetrics ( enviroment )
2023-06-26 08:47:22 -07:00
}
2023-10-18 07:18:32 -07:00
// Creates a callback to publish VAA messages to a redis pubsub
func newVAARedisProducerFunc ( ctx context . Context , isLocal bool , logger * zap . Logger ) ( producer . PushFunc , error ) {
if isLocal {
return func ( context . Context , * producer . NotificationEvent ) error {
return nil
} , nil
}
redisUri , err := getenv ( "REDIS_URI" )
if err != nil {
logger . Fatal ( "could not create vaa notifier " , zap . Error ( err ) )
}
redisPrefix , err := getenv ( "REDIS_PREFIX" )
if err != nil {
logger . Fatal ( "could not create vaa notifier " , zap . Error ( err ) )
}
redisChannel , err := getenv ( "REDIS_VAA_CHANNEL" )
if err != nil {
logger . Fatal ( "could not create vaa notifier " , zap . Error ( err ) )
}
channel := fmt . Sprintf ( "%s:%s" , redisPrefix , redisChannel )
logger . Info ( "using redis producer" , zap . String ( "channel" , channel ) )
client := redis . NewClient ( & redis . Options { Addr : redisUri } )
return producer . NewRedisProducer ( client , channel ) . Push , nil
}
2022-09-07 11:43:05 -07:00
func main ( ) {
2023-03-06 09:36:14 -08:00
//TODO: use a configuration structure to obtain the configuration
2023-08-24 11:16:43 -07:00
_ = godotenv . Load ( )
2023-03-06 09:36:14 -08:00
2023-07-24 07:24:39 -07:00
// load configuration
cfg , err := config . New ( rootCtx )
if err != nil {
log . Fatal ( "Error creating config" , err )
}
2022-10-22 17:58:38 -07:00
// Node's main lifecycle context.
rootCtx , rootCtxCancel = context . WithCancel ( context . Background ( ) )
defer rootCtxCancel ( )
2023-01-30 10:51:13 -08:00
// get p2p values to connect p2p network
2023-01-31 06:38:17 -08:00
p2pNetworkConfig , err := config . GetP2pNetwork ( )
2023-01-30 10:51:13 -08:00
if err != nil {
fmt . Println ( err )
os . Exit ( 1 )
}
2022-09-07 11:43:05 -07:00
nodeKeyPath = "/tmp/node.key"
2022-09-11 13:57:01 -07:00
logLevel = "warn"
2022-09-07 11:43:05 -07:00
common . SetRestrictiveUmask ( )
2023-03-23 11:36:50 -07:00
logger := logger . New ( "wormhole-fly" , logger . WithLevel ( logLevel ) )
2022-09-07 11:43:05 -07:00
2022-11-21 06:18:33 -08:00
isLocal := flag . Bool ( "local" , false , "a bool" )
flag . Parse ( )
2022-09-07 11:43:05 -07:00
// Verify flags
if nodeKeyPath == "" {
logger . Fatal ( "Please specify --nodeKey" )
}
2023-01-30 10:51:13 -08:00
if p2pNetworkConfig . P2pBootstrap == "" {
2022-09-07 11:43:05 -07:00
logger . Fatal ( "Please specify --bootstrap" )
}
2023-06-22 14:59:23 -07:00
// get Alert client
alertClient , err := newAlertClient ( )
if err != nil {
logger . Fatal ( "could not create alert client" , zap . Error ( err ) )
}
2023-07-03 11:10:52 -07:00
// new metrics client
2023-07-06 07:20:18 -07:00
metrics := newMetrics ( config . GetEnvironment ( ) )
2023-06-26 08:47:22 -07:00
2022-09-07 11:43:05 -07:00
// Setup DB
uri := os . Getenv ( "MONGODB_URI" )
if uri == "" {
logger . Fatal ( "You must set your 'MONGODB_URI' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable" )
}
2023-01-05 11:40:24 -08:00
databaseName := os . Getenv ( "MONGODB_DATABASE" )
if databaseName == "" {
logger . Fatal ( "You must set your 'MONGODB_DATABASE' environmental variable. See\n\t https://www.mongodb.com/docs/drivers/go/current/usage-examples/#environment-variable" )
}
2023-09-25 12:50:16 -07:00
db , err := dbutil . Connect ( rootCtx , logger , uri , databaseName , false )
2022-11-07 09:47:10 -08:00
if err != nil {
logger . Fatal ( "could not connect to DB" , zap . Error ( err ) )
}
2022-11-08 09:58:22 -08:00
2022-11-21 06:18:33 -08:00
// Run the database migration.
2023-08-07 12:05:08 -07:00
err = migration . Run ( db . Database )
2022-11-08 09:58:22 -08:00
if err != nil {
logger . Fatal ( "error running migration" , zap . Error ( err ) )
}
2023-10-18 07:18:32 -07:00
// Creates a callback to publish VAA messages to a redis pubsub
vaaRedisProducerFunc , err := newVAARedisProducerFunc ( rootCtx , * isLocal , logger )
if err != nil {
logger . Fatal ( "could not create vaa redis producer " , zap . Error ( err ) )
}
// Creates a composite callback to publish VAA messages to a redis pubsub
producerFunc := producer . NewComposite ( vaaRedisProducerFunc )
// Creates a callback to publish VAA messages to a redis pubsub
repository := storage . NewRepository ( alertClient , metrics , db . Database , producerFunc , logger )
2022-09-07 11:43:05 -07:00
// Outbound gossip message queue
sendC := make ( chan [ ] byte )
// Inbound observations
2023-08-24 11:16:43 -07:00
obsvC := make ( chan * common . MsgWithTimeStamp [ gossipv1 . SignedObservation ] , cfg . ObservationsChannelSize )
2022-09-07 11:43:05 -07:00
2023-07-24 07:24:39 -07:00
// Inbound observation requests - we don't add a environment because we are going to delete this channel
2022-09-15 12:17:28 -07:00
obsvReqC := make ( chan * gossipv1 . ObservationRequest , 50 )
2022-09-07 11:43:05 -07:00
// Inbound signed VAAs
2023-07-24 07:24:39 -07:00
signedInC := make ( chan * gossipv1 . SignedVAAWithQuorum , cfg . VaasChannelSize )
2022-09-07 11:43:05 -07:00
// Heartbeat updates
2023-07-24 07:24:39 -07:00
heartbeatC := make ( chan * gossipv1 . Heartbeat , cfg . HeartbeatsChannelSize )
2022-09-07 11:43:05 -07:00
// Guardian set state managed by processor
gst := common . NewGuardianSetState ( heartbeatC )
2022-11-08 06:53:12 -08:00
// Governor cfg
2023-07-24 07:24:39 -07:00
govConfigC := make ( chan * gossipv1 . SignedChainGovernorConfig , cfg . GovernorConfigChannelSize )
2022-11-08 06:53:12 -08:00
// Governor status
2023-07-24 07:24:39 -07:00
govStatusC := make ( chan * gossipv1 . SignedChainGovernorStatus , cfg . GovernorStatusChannelSize )
2023-01-31 06:38:17 -08:00
2022-09-07 11:43:05 -07:00
// Bootstrap guardian set, otherwise heartbeats would be skipped
2022-09-11 13:57:01 -07:00
// TODO: fetch this and probably figure out how to update it live
2023-07-25 11:39:27 -07:00
guardianSetHistory := guardiansets . GetByEnv ( p2pNetworkConfig . Enviroment , alertClient )
2023-02-02 05:17:42 -08:00
gsLastet := guardianSetHistory . GetLatest ( )
2023-01-31 06:38:17 -08:00
gst . Set ( & gsLastet )
2022-09-07 11:43:05 -07:00
2022-10-22 17:58:38 -07:00
// Ignore observation requests
// Note: without this, the whole program hangs on observation requests
discardMessages ( rootCtx , obsvReqC )
2023-08-08 07:11:35 -07:00
maxHealthTimeSeconds := config . GetMaxHealthTimeSeconds ( )
guardianCheck := health . NewGuardianCheck ( maxHealthTimeSeconds )
2022-10-22 17:58:38 -07:00
2022-09-15 12:17:28 -07:00
// Log observations
2022-09-07 11:43:05 -07:00
go func ( ) {
for {
select {
case <- rootCtx . Done ( ) :
return
2023-08-24 11:16:43 -07:00
case m := <- obsvC :
o := m . Msg
2023-08-08 07:11:35 -07:00
guardianCheck . Ping ( rootCtx )
2023-06-26 08:47:22 -07:00
metrics . IncObservationTotal ( )
2022-10-22 17:58:38 -07:00
ok := verifyObservation ( logger , o , gst . Get ( ) )
if ! ok {
logger . Error ( "Could not verify observation" , zap . String ( "id" , o . MessageId ) )
2022-11-07 09:47:10 -08:00
continue
2022-10-22 17:58:38 -07:00
}
2023-02-01 04:20:10 -08:00
2023-06-26 08:47:22 -07:00
// get chainID from observationID.
chainID , err := getObservationChainID ( logger , o )
if err != nil {
logger . Error ( "Error getting chainID" , zap . Error ( err ) )
continue
}
metrics . IncObservationFromGossipNetwork ( chainID )
2023-02-01 04:20:10 -08:00
// apply filter observations by env.
if filterObservationByEnv ( o , p2pNetworkConfig . Enviroment ) {
continue
}
2023-06-26 08:47:22 -07:00
metrics . IncObservationUnfiltered ( chainID )
err = repository . UpsertObservation ( o )
2022-09-07 11:43:05 -07:00
if err != nil {
logger . Error ( "Error inserting observation" , zap . Error ( err ) )
}
}
}
} ( )
// Log signed VAAs
2022-11-21 06:18:33 -08:00
cache , err := newCache ( )
if err != nil {
logger . Fatal ( "could not create cache" , zap . Error ( err ) )
}
2023-01-05 11:40:24 -08:00
isLocalFlag := isLocal != nil && * isLocal
2022-11-21 06:18:33 -08:00
// Creates a deduplicator to discard VAA messages that were processed previously
deduplicator := deduplicator . New ( cache , logger )
// Creates two callbacks
2023-05-10 11:49:40 -07:00
sqsConsumer , vaaQueueConsume , nonPythVaaPublish := newVAAConsumePublish ( rootCtx , isLocalFlag , logger )
2023-01-05 11:40:24 -08:00
// Create a vaa notifier
notifierFunc := newVAANotifierFunc ( isLocalFlag , logger )
2022-11-21 06:18:33 -08:00
// Creates a instance to consume VAA messages from Gossip network and handle the messages
// When recive a message, the message filter by deduplicator
// if VAA is from pyhnet should be saved directly to repository
// if VAA is from non pyhnet should be publish with nonPythVaaPublish
2023-06-26 08:47:22 -07:00
vaaGossipConsumer := processor . NewVAAGossipConsumer ( & guardianSetHistory , deduplicator , nonPythVaaPublish , repository . UpsertVaa , metrics , logger )
2022-11-21 06:18:33 -08:00
// Creates a instance to consume VAA messages (non pyth) from a queue and store in a storage
2023-06-26 08:47:22 -07:00
vaaQueueConsumer := processor . NewVAAQueueConsumer ( vaaQueueConsume , repository , notifierFunc , metrics , logger )
2022-11-21 06:18:33 -08:00
// Creates a wrapper that splits the incoming VAAs into 2 channels (pyth to non pyth) in order
// to be able to process them in a differentiated way
vaaGossipConsumerSplitter := processor . NewVAAGossipSplitterConsumer ( vaaGossipConsumer . Push , logger )
vaaQueueConsumer . Start ( rootCtx )
vaaGossipConsumerSplitter . Start ( rootCtx )
2022-12-05 12:41:37 -08:00
// start fly http server.
2023-02-03 10:18:44 -08:00
pprofEnabled := config . GetPprofEnabled ( )
2023-08-23 06:19:16 -07:00
server := server . NewServer ( cfg . ApiPort , guardianCheck , logger , repository , sqsConsumer , * isLocal , pprofEnabled , alertClient )
2022-12-05 12:41:37 -08:00
server . Start ( )
2022-09-07 11:43:05 -07:00
go func ( ) {
for {
select {
case <- rootCtx . Done ( ) :
return
2022-10-22 17:58:38 -07:00
case sVaa := <- signedInC :
2023-08-08 07:11:35 -07:00
guardianCheck . Ping ( rootCtx )
2023-06-26 08:47:22 -07:00
metrics . IncVaaTotal ( )
2022-10-22 17:58:38 -07:00
v , err := vaa . Unmarshal ( sVaa . Vaa )
if err != nil {
logger . Error ( "Error unmarshalling vaa" , zap . Error ( err ) )
continue
}
2023-02-01 04:20:10 -08:00
2023-06-26 08:47:22 -07:00
metrics . IncVaaFromGossipNetwork ( v . EmitterChain )
2023-02-01 04:20:10 -08:00
// apply filter observations by env.
if filterVaasByEnv ( v , p2pNetworkConfig . Enviroment ) {
continue
}
2022-11-21 06:18:33 -08:00
// Push an incoming VAA to be processed
if err := vaaGossipConsumerSplitter . Push ( rootCtx , v , sVaa . Vaa ) ; err != nil {
2022-09-07 11:43:05 -07:00
logger . Error ( "Error inserting vaa" , zap . Error ( err ) )
}
}
}
} ( )
2022-09-15 12:17:28 -07:00
// Log heartbeats
2023-02-13 12:28:34 -08:00
go func ( guardianCheck * health . GuardianCheck ) {
2022-09-07 11:43:05 -07:00
for {
select {
case <- rootCtx . Done ( ) :
return
2022-09-11 13:57:01 -07:00
case hb := <- heartbeatC :
2023-08-08 07:11:35 -07:00
guardianCheck . Ping ( rootCtx )
2023-06-26 08:47:22 -07:00
metrics . IncHeartbeatFromGossipNetwork ( hb . NodeName )
2022-10-22 17:58:38 -07:00
err := repository . UpsertHeartbeat ( hb )
2022-09-07 11:43:05 -07:00
if err != nil {
logger . Error ( "Error inserting heartbeat" , zap . Error ( err ) )
2023-06-26 08:47:22 -07:00
} else {
metrics . IncHeartbeatInserted ( hb . NodeName )
2022-09-07 11:43:05 -07:00
}
}
}
2023-02-13 12:28:34 -08:00
} ( guardianCheck )
2022-09-07 11:43:05 -07:00
2022-11-08 06:53:12 -08:00
// Log govConfigs
go func ( ) {
for {
select {
case <- rootCtx . Done ( ) :
return
case govConfig := <- govConfigC :
2023-08-08 07:11:35 -07:00
guardianCheck . Ping ( rootCtx )
2023-06-26 08:47:22 -07:00
nodeName , err := getGovernorConfigNodeName ( govConfig )
if err != nil {
logger . Error ( "Error getting gov config node name" , zap . Error ( err ) )
continue
}
metrics . IncGovernorConfigFromGossipNetwork ( nodeName )
err = repository . UpsertGovernorConfig ( govConfig )
2022-11-08 06:53:12 -08:00
if err != nil {
logger . Error ( "Error inserting gov config" , zap . Error ( err ) )
2023-06-26 08:47:22 -07:00
} else {
metrics . IncGovernorConfigInserted ( nodeName )
2022-11-08 06:53:12 -08:00
}
}
}
} ( )
// Log govStatus
go func ( ) {
for {
select {
case <- rootCtx . Done ( ) :
return
case govStatus := <- govStatusC :
2023-08-08 07:11:35 -07:00
guardianCheck . Ping ( rootCtx )
2023-06-26 08:47:22 -07:00
nodeName , err := getGovernorStatusNodeName ( govStatus )
if err != nil {
logger . Error ( "Error getting gov status node name" , zap . Error ( err ) )
continue
}
metrics . IncGovernorStatusFromGossipNetwork ( nodeName )
err = repository . UpsertGovernorStatus ( govStatus )
2022-11-08 06:53:12 -08:00
if err != nil {
logger . Error ( "Error inserting gov status" , zap . Error ( err ) )
2023-06-26 08:47:22 -07:00
} else {
metrics . IncGovernorStatusInserted ( nodeName )
2022-11-08 06:53:12 -08:00
}
}
}
} ( )
2022-09-07 11:43:05 -07:00
// Load p2p private key
var priv crypto . PrivKey
priv , err = common . GetOrCreateNodeKey ( logger , nodeKeyPath )
if err != nil {
logger . Fatal ( "Failed to load node key" , zap . Error ( err ) )
}
// Run supervisor.
supervisor . New ( rootCtx , logger , func ( ctx context . Context ) error {
2023-08-23 06:19:16 -07:00
components := p2p . DefaultComponents ( )
components . Port = cfg . P2pPort
2023-01-23 11:13:59 -08:00
if err := supervisor . Run ( ctx , "p2p" ,
2023-08-24 11:16:43 -07:00
p2p . Run ( obsvC , obsvReqC , nil , sendC , signedInC , priv , nil , gst , p2pNetworkConfig . P2pNetworkID , p2pNetworkConfig . P2pBootstrap , "" , false , rootCtxCancel , nil , nil , govConfigC , govStatusC , components , nil , false ) ) ; err != nil {
2022-09-07 11:43:05 -07:00
return err
}
logger . Info ( "Started internal services" )
<- ctx . Done ( )
return nil
} ,
// It's safer to crash and restart the process in case we encounter a panic,
// rather than attempting to reschedule the runnable.
supervisor . WithPropagatePanic )
<- rootCtx . Done ( )
2023-08-07 12:05:08 -07:00
2022-09-07 11:43:05 -07:00
// TODO: wait for things to shut down gracefully
2022-11-21 06:18:33 -08:00
vaaGossipConsumerSplitter . Close ( )
2022-12-05 12:41:37 -08:00
server . Stop ( )
2023-08-07 12:05:08 -07:00
logger . Info ( "Closing MongoDB connection..." )
db . DisconnectWithTimeout ( 10 * time . Second )
2022-09-07 11:43:05 -07:00
}
2022-10-22 17:58:38 -07:00
2023-06-26 08:47:22 -07:00
// getGovernorConfigNodeName get node name from governor config.
func getGovernorConfigNodeName ( govConfig * gossipv1 . SignedChainGovernorConfig ) ( string , error ) {
var gCfg gossipv1 . ChainGovernorConfig
err := proto . Unmarshal ( govConfig . Config , & gCfg )
if err != nil {
return "" , err
}
return gCfg . NodeName , nil
}
// getGovernorStatusNodeName get node name from governor status.
func getGovernorStatusNodeName ( govStatus * gossipv1 . SignedChainGovernorStatus ) ( string , error ) {
var gStatus gossipv1 . ChainGovernorStatus
err := proto . Unmarshal ( govStatus . Status , & gStatus )
if err != nil {
return "" , err
}
return gStatus . NodeName , nil
}
// getObservationChainID get chainID from observationID.
func getObservationChainID ( logger * zap . Logger , obs * gossipv1 . SignedObservation ) ( vaa . ChainID , error ) {
vaaID := strings . Split ( obs . MessageId , "/" )
chainIDStr := vaaID [ 0 ]
chainID , err := strconv . ParseUint ( chainIDStr , 10 , 16 )
if err != nil {
logger . Error ( "Error parsing chainId" , zap . Error ( err ) )
return 0 , err
}
return vaa . ChainID ( chainID ) , nil
}
2022-10-22 17:58:38 -07:00
func verifyObservation ( logger * zap . Logger , obs * gossipv1 . SignedObservation , gs * common . GuardianSet ) bool {
pk , err := crypto2 . Ecrecover ( obs . GetHash ( ) , obs . GetSignature ( ) )
if err != nil {
return false
}
theirAddr := eth_common . BytesToAddress ( obs . GetAddr ( ) )
signerAddr := eth_common . BytesToAddress ( crypto2 . Keccak256 ( pk [ 1 : ] ) [ 12 : ] )
if theirAddr != signerAddr {
logger . Error ( "error validating observation, signer addr and addr don't match" ,
zap . String ( "id" , obs . MessageId ) ,
zap . String ( "obs_addr" , theirAddr . Hex ( ) ) ,
zap . String ( "signer_addr" , signerAddr . Hex ( ) ) ,
)
return false
}
_ , isFromGuardian := gs . KeyIndex ( theirAddr )
if ! isFromGuardian {
logger . Error ( "error validating observation, signer not in guardian set" ,
zap . String ( "id" , obs . MessageId ) ,
zap . String ( "obs_addr" , theirAddr . Hex ( ) ) ,
)
}
return isFromGuardian
}
func discardMessages [ T any ] ( ctx context . Context , obsvReqC chan T ) {
go func ( ) {
for {
select {
case <- ctx . Done ( ) :
return
case <- obsvReqC :
}
}
} ( )
}
2023-02-01 04:20:10 -08:00
// filterObservation filter observation by enviroment.
func filterObservationByEnv ( o * gossipv1 . SignedObservation , enviroment string ) bool {
2023-03-06 09:36:40 -08:00
if enviroment == domain . P2pTestNet {
2023-07-24 07:24:39 -07:00
// filter pyth message in testnet gossip network (for solana and pyth chain).
2023-04-25 12:23:07 -07:00
if strings . Contains ( ( o . GetMessageId ( ) ) , "1/f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0" ) ||
strings . HasPrefix ( "26/" , o . GetMessageId ( ) ) {
2023-02-01 04:20:10 -08:00
return true
}
}
2023-07-24 07:24:39 -07:00
// filter pyth message in mainnet gossip network (for pyth chain).
if enviroment == domain . P2pMainNet && strings . HasPrefix ( "26/" , o . GetMessageId ( ) ) {
return true
}
2023-02-01 04:20:10 -08:00
return false
}
// filterVaasByEnv filter vaa by enviroment.
func filterVaasByEnv ( v * vaa . VAA , enviroment string ) bool {
2023-03-06 09:36:40 -08:00
if enviroment == domain . P2pTestNet {
2023-02-01 04:20:10 -08:00
vaaFromSolana := v . EmitterChain == vaa . ChainIDSolana
addressToFilter := strings . ToLower ( v . EmitterAddress . String ( ) ) == "f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0"
2023-04-25 12:23:07 -07:00
isPyth := v . EmitterChain == vaa . ChainIDPythNet
if ( vaaFromSolana && addressToFilter ) || isPyth {
2023-02-01 04:20:10 -08:00
return true
}
}
return false
}