[fly] Missing observations (#570)

Add environment variables for buffer channels (observations y vaas)
Increment size of buffer channels for observations and vaas
Align k8s resources in fly

Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
ftocal 2023-07-24 11:24:39 -03:00 committed by GitHub
parent 1d512dba7b
commit 94307b849a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 74 additions and 12 deletions

View File

@ -4,9 +4,9 @@ NAME=wormscan-fly
REPLICAS=5
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=512Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_LIMITS_CPU=700m
RESOURCES_REQUESTS_MEMORY=384Mi
RESOURCES_REQUESTS_CPU=250m
RESOURCES_REQUESTS_CPU=500m
SQS_URL=
SQS_AWS_REGION=
P2P_NETWORK=mainnet
@ -15,3 +15,8 @@ MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=2500
VAAS_CHANNEL_SIZE=150
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50

View File

@ -15,3 +15,8 @@ MAX_HEALTH_TIME_SECONDS=90
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=2500
VAAS_CHANNEL_SIZE=150
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50

View File

@ -15,3 +15,8 @@ MAX_HEALTH_TIME_SECONDS=300
AWS_IAM_ROLE=
ALERT_ENABLED=false
METRICS_ENABLED=true
OBSERVATIONS_CHANNEL_SIZE=50
VAAS_CHANNEL_SIZE=50
HEARTBEATS_CHANNEL_SIZE=50
GOVERNOR_CONFIG_CHANNEL_SIZE=50
GOVERNOR_STATUS_CHANNEL_SIZE=50

View File

@ -83,6 +83,16 @@ spec:
value: "{{ .ALERT_ENABLED }}"
- name: METRICS_ENABLED
value: "{{ .METRICS_ENABLED }}"
- name: OBSERVATIONS_CHANNEL_SIZE
value: "{{ .OBSERVATIONS_CHANNEL_SIZE }}"
- name: VAAS_CHANNEL_SIZE
value: "{{ .VAAS_CHANNEL_SIZE }}"
- name: HEARTBEATS_CHANNEL_SIZE
value: "{{ .HEARTBEATS_CHANNEL_SIZE }}"
- name: GOVERNOR_CONFIG_CHANNEL_SIZE
value: "{{ .GOVERNOR_CONFIG_CHANNEL_SIZE }}"
- name: GOVERNOR_STATUS_CHANNEL_SIZE
value: "{{ .GOVERNOR_STATUS_CHANNEL_SIZE }}"
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}

View File

@ -1,10 +1,13 @@
package config
import (
"context"
"fmt"
"os"
"strconv"
"github.com/joho/godotenv"
"github.com/sethvargo/go-envconfig"
"github.com/wormhole-foundation/wormhole-explorer/common/client/alert"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
)
@ -119,3 +122,23 @@ func GetPrefix() string {
prefix := p2pNetwork.Enviroment + "-" + GetEnvironment()
return prefix
}
type Configuration struct {
ObservationsChannelSize int `env:"OBSERVATIONS_CHANNEL_SIZE,required"`
VaasChannelSize int `env:"VAAS_CHANNEL_SIZE,required"`
HeartbeatsChannelSize int `env:"HEARTBEATS_CHANNEL_SIZE,required"`
GovernorConfigChannelSize int `env:"GOVERNOR_CONFIG_CHANNEL_SIZE,required"`
GovernorStatusChannelSize int `env:"GOVERNOR_STATUS_CHANNEL_SIZE,required"`
}
// New creates a configuration with the values from .env file and environment variables.
func New(ctx context.Context) (*Configuration, error) {
_ = godotenv.Load(".env", "../.env")
var configuration Configuration
if err := envconfig.Process(ctx, &configuration); err != nil {
return nil, err
}
return &configuration, nil
}

View File

@ -17,6 +17,7 @@ require (
github.com/joho/godotenv v1.4.0
github.com/libp2p/go-libp2p-core v0.20.0
github.com/patrickmn/go-cache v2.1.0+incompatible
github.com/sethvargo/go-envconfig v0.9.0
github.com/stretchr/testify v1.8.1
github.com/test-go/testify v1.1.4
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8

View File

@ -2724,6 +2724,8 @@ github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfP
github.com/sergi/go-diff v1.0.0/go.mod h1:0CfEIISq7TuYL3j771MWULgwwjU+GofnZX9QAmXWZgo=
github.com/sergi/go-diff v1.1.0/go.mod h1:STckp+ISIX8hZLjrqAeVduY0gWCT9IjLuqbuNXdaHfM=
github.com/serialx/hashring v0.0.0-20190422032157-8b2912629002/go.mod h1:/yeG0My1xr/u+HZrFQ1tOQQQQrOawfyMUH13ai5brBc=
github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE=
github.com/sethvargo/go-envconfig v0.9.0/go.mod h1:Iz1Gy1Sf3T64TQlJSvee81qDhf7YIlt8GMUX6yyNFs0=
github.com/shazow/go-diff v0.0.0-20160112020656-b6b7b6733b8c/go.mod h1:/PevMnwAxekIXwN8qQyfc5gl2NlkB3CQlkizAbOkeBs=
github.com/shirou/gopsutil v0.0.0-20190901111213-e4ec7b275ada/go.mod h1:WWnYX4lzhCH5h/3YBfyVA3VbLYjlMZZAQcW9ojMexNc=
github.com/shirou/gopsutil v2.20.5+incompatible/go.mod h1:5b4v6he4MtMOwMlS0TUMTu2PcXUg8+E1lC7eC3UO/RA=

View File

@ -3,6 +3,7 @@ package main
import (
"context"
"flag"
"log"
"strconv"
"strings"
@ -215,6 +216,12 @@ func main() {
fmt.Println("No .env file found")
}
// load configuration
cfg, err := config.New(rootCtx)
if err != nil {
log.Fatal("Error creating config", err)
}
// Node's main lifecycle context.
rootCtx, rootCtxCancel = context.WithCancel(context.Background())
defer rootCtxCancel()
@ -280,25 +287,25 @@ func main() {
sendC := make(chan []byte)
// Inbound observations
obsvC := make(chan *gossipv1.SignedObservation, 50)
obsvC := make(chan *gossipv1.SignedObservation, cfg.ObservationsChannelSize)
// Inbound observation requests
// Inbound observation requests - we don't add a environment because we are going to delete this channel
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
// Inbound signed VAAs
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, cfg.VaasChannelSize)
// Heartbeat updates
heartbeatC := make(chan *gossipv1.Heartbeat, 50)
heartbeatC := make(chan *gossipv1.Heartbeat, cfg.HeartbeatsChannelSize)
// Guardian set state managed by processor
gst := common.NewGuardianSetState(heartbeatC)
// Governor cfg
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, 50)
govConfigC := make(chan *gossipv1.SignedChainGovernorConfig, cfg.GovernorConfigChannelSize)
// Governor status
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, 50)
govStatusC := make(chan *gossipv1.SignedChainGovernorStatus, cfg.GovernorStatusChannelSize)
// Bootstrap guardian set, otherwise heartbeats would be skipped
// TODO: fetch this and probably figure out how to update it live
@ -575,12 +582,16 @@ func discardMessages[T any](ctx context.Context, obsvReqC chan T) {
// filterObservation filter observation by enviroment.
func filterObservationByEnv(o *gossipv1.SignedObservation, enviroment string) bool {
if enviroment == domain.P2pTestNet {
// filter pyth message in test enviroment (for solana and pyth chain).
// filter pyth message in testnet gossip network (for solana and pyth chain).
if strings.Contains((o.GetMessageId()), "1/f346195ac02f37d60d4db8ffa6ef74cb1be3550047543a4a9ee9acf4d78697b0") ||
strings.HasPrefix("26/", o.GetMessageId()) {
return true
}
}
// filter pyth message in mainnet gossip network (for pyth chain).
if enviroment == domain.P2pMainNet && strings.HasPrefix("26/", o.GetMessageId()) {
return true
}
return false
}