diff --git a/Tiltfile b/Tiltfile index 37212d59..394cb6ba 100644 --- a/Tiltfile +++ b/Tiltfile @@ -70,6 +70,9 @@ if fly: k8s_resource( "fly", + port_forwards = [ + port_forward(8001, name = "Server [:8001]", host = webHost), + ], resource_deps = ["mongo"] ) diff --git a/api/handlers/infraestructure/controller.go b/api/handlers/infraestructure/controller.go new file mode 100644 index 00000000..7cd9c547 --- /dev/null +++ b/api/handlers/infraestructure/controller.go @@ -0,0 +1,33 @@ +package infraestructure + +import "github.com/gofiber/fiber/v2" + +// Controller definition. +type Controller struct { + srv *Service +} + +// NewController creates a Controller instance. +func NewController(serv *Service) *Controller { + return &Controller{srv: serv} +} + +// HealthCheck handler for the endpoint /health. +func (c *Controller) HealthCheck(ctx *fiber.Ctx) error { + return ctx.JSON(struct { + Status string `json:"status"` + }{Status: "OK"}) +} + +// ReadyCheck handler for the endpoint /ready +func (c *Controller) ReadyCheck(ctx *fiber.Ctx) error { + ready, _ := c.srv.CheckMongoServerStatus(ctx.Context()) + if ready { + return ctx.Status(fiber.StatusOK).JSON(struct { + Ready string `json:"ready"` + }{Ready: "OK"}) + } + return ctx.Status(fiber.StatusInternalServerError).JSON(struct { + Ready string `json:"ready"` + }{Ready: "NO"}) +} diff --git a/api/handlers/infraestructure/model.go b/api/handlers/infraestructure/model.go new file mode 100644 index 00000000..c96ead64 --- /dev/null +++ b/api/handlers/infraestructure/model.go @@ -0,0 +1,19 @@ +package infraestructure + +// MongoStatus represent a mongo server status. +type MongoStatus struct { + Ok int32 `bson:"ok"` + Host string `bson:"host"` + Version string `bson:"version"` + Process string `bson:"process"` + Pid int32 `bson:"pid"` + Uptime int32 `bson:"uptime"` + Connections *MongoConnections `bson:"connections"` +} + +// MongoConnections represents a mongo server connection. +type MongoConnections struct { + Current int32 `bson:"current"` + Available int32 `bson:"available"` + TotalCreated int32 `bson:"totalCreated"` +} diff --git a/api/handlers/infraestructure/repository.go b/api/handlers/infraestructure/repository.go new file mode 100644 index 00000000..a338a473 --- /dev/null +++ b/api/handlers/infraestructure/repository.go @@ -0,0 +1,46 @@ +package infraestructure + +import ( + "context" + "fmt" + + "github.com/pkg/errors" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.uber.org/zap" +) + +// Repository definition. +type Repository struct { + db *mongo.Database + logger *zap.Logger +} + +// NewRepository create a new Repository. +func NewRepository(db *mongo.Database, logger *zap.Logger) *Repository { + return &Repository{db: db, + logger: logger.With(zap.String("module", "InfraestructureRepository")), + } +} + +// GetMongoStatus get mongo server status +func (r *Repository) GetMongoStatus(ctx context.Context) (*MongoStatus, error) { + command := bson.D{{Key: "serverStatus", Value: 1}} + result := r.db.RunCommand(ctx, command) + if result.Err() != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + r.logger.Error("failed execute command mongo serverStatus", + zap.Error(result.Err()), zap.String("requestID", requestID)) + return nil, errors.WithStack(result.Err()) + } + + var mongoStatus MongoStatus + err := result.Decode(&mongoStatus) + if err != nil { + requestID := fmt.Sprintf("%v", ctx.Value("requestid")) + r.logger.Error("failed decoding cursor to *MongoStatus", zap.Error(err), + zap.String("requestID", requestID)) + return nil, errors.WithStack(err) + } + return &mongoStatus, nil +} diff --git a/api/handlers/infraestructure/service.go b/api/handlers/infraestructure/service.go new file mode 100644 index 00000000..64eecf34 --- /dev/null +++ b/api/handlers/infraestructure/service.go @@ -0,0 +1,38 @@ +package infraestructure + +import ( + "context" + "fmt" + + "go.uber.org/zap" +) + +type Service struct { + repo *Repository + logger *zap.Logger +} + +// NewService create a new governor.Service. +func NewService(dao *Repository, logger *zap.Logger) *Service { + return &Service{repo: dao, logger: logger.With(zap.String("module", "Infraestructureervice"))} +} + +// CheckMongoServerStatus +func (s *Service) CheckMongoServerStatus(ctx context.Context) (bool, error) { + mongoStatus, err := s.repo.GetMongoStatus(ctx) + if err != nil { + return false, err + } + + // check mongo server status + mongoStatusCheck := (mongoStatus.Ok == 1 && mongoStatus.Pid > 0 && mongoStatus.Uptime > 0) + if !mongoStatusCheck { + return false, fmt.Errorf("mongo server not ready (Ok = %v, Pid = %v, Uptime = %v)", mongoStatus.Ok, mongoStatus.Pid, mongoStatus.Uptime) + } + + // check mongo connections + if mongoStatus.Connections.Available <= 0 { + return false, fmt.Errorf("mongo server without available connections (availableConection = %v)", mongoStatus.Connections.Available) + } + return true, nil +} diff --git a/api/main.go b/api/main.go index 7a4f99a6..527ffb54 100644 --- a/api/main.go +++ b/api/main.go @@ -15,6 +15,7 @@ import ( "github.com/gofiber/fiber/v2/middleware/requestid" ipfslog "github.com/ipfs/go-log/v2" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/governor" + "github.com/wormhole-foundation/wormhole-explorer/api/handlers/infraestructure" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa" "github.com/wormhole-foundation/wormhole-explorer/api/internal/config" @@ -69,16 +70,19 @@ func main() { vaaRepo := vaa.NewRepository(db, rootLogger) obsRepo := observations.NewRepository(db, rootLogger) governorRepo := governor.NewRepository(db, rootLogger) + infraestructureRepo := infraestructure.NewRepository(db, rootLogger) // Setup services vaaService := vaa.NewService(vaaRepo, rootLogger) obsService := observations.NewService(obsRepo, rootLogger) governorService := governor.NewService(governorRepo, rootLogger) + infraestructureService := infraestructure.NewService(infraestructureRepo, rootLogger) // Setup controllers vaaCtrl := vaa.NewController(vaaService, rootLogger) observationsCtrl := observations.NewController(obsService, rootLogger) governorCtrl := governor.NewController(governorService, rootLogger) + infraestructureCtrl := infraestructure.NewController(infraestructureService) // Setup app with custom error handling. response.SetEnableStackTrace(*cfg) @@ -98,7 +102,8 @@ func main() { api.Use(cors.New()) // TODO CORS restrictions? api.Use(middleware.ExtractPagination) - api.Get("/health", healthOk) + api.Get("/health", infraestructureCtrl.HealthCheck) + api.Get("/ready", infraestructureCtrl.ReadyCheck) // vaas resource vaas := api.Group("/vaas") diff --git a/devnet/fly.yaml b/devnet/fly.yaml index 04112d48..dd485ace 100644 --- a/devnet/fly.yaml +++ b/devnet/fly.yaml @@ -9,6 +9,10 @@ spec: clusterIP: None selector: app: fly + ports: + - port: 8001 + name: fly + protocol: TCP --- apiVersion: apps/v1 kind: StatefulSet @@ -33,11 +37,14 @@ spec: env: - name: MONGODB_URI value: mongodb://mongo-0.mongo,mongo-1.mongo,mongo-2.mongo/?replicaSet=rs0 + - name: API_PORT + value: "8001" readinessProbe: - exec: - command: - - test - - -e - - "/tmp/node.key" - initialDelaySeconds: 5 - periodSeconds: 5 + httpGet: + path: /api/ready + port: 8001 + successThreshold: 3 + ports: + - containerPort: 8001 + name: server + protocol: TCP diff --git a/fly/go.mod b/fly/go.mod index 7470b7ab..bbb944b1 100644 --- a/fly/go.mod +++ b/fly/go.mod @@ -8,6 +8,7 @@ require ( github.com/dgraph-io/ristretto v0.1.1 github.com/eko/gocache/v3 v3.1.2 github.com/ethereum/go-ethereum v1.10.21 + github.com/gofiber/fiber/v2 v2.40.1 github.com/ipfs/go-log/v2 v2.5.1 github.com/joho/godotenv v1.4.0 github.com/libp2p/go-libp2p-core v0.20.0 @@ -21,6 +22,7 @@ require ( require ( github.com/XiaoMi/pegasus-go-client v0.0.0-20210427083443-f3b6b08bc4c2 // indirect + github.com/andybalholm/brotli v1.0.4 // indirect github.com/benbjohnson/clock v1.3.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/bradfitz/gomemcache v0.0.0-20221031212613-62deef7fc822 // indirect @@ -95,8 +97,10 @@ require ( github.com/marten-seemann/qtls-go1-18 v0.1.2 // indirect github.com/marten-seemann/qtls-go1-19 v0.1.0 // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect + github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.16 // indirect github.com/mattn/go-pointer v0.0.1 // indirect + github.com/mattn/go-runewidth v0.0.14 // indirect github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect github.com/miekg/dns v1.1.50 // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect @@ -128,10 +132,14 @@ require ( github.com/prometheus/common v0.37.0 // indirect github.com/prometheus/procfs v0.8.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect + github.com/rivo/uniseg v0.2.0 // indirect github.com/sirupsen/logrus v1.9.0 // indirect github.com/spacemonkeygo/spacelog v0.0.0-20180420211403-2296661a0572 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cast v1.5.0 // indirect + github.com/valyala/bytebufferpool v1.0.0 // indirect + github.com/valyala/fasthttp v1.41.0 // indirect + github.com/valyala/tcplisten v1.0.0 // indirect github.com/whyrusleeping/go-keyspace v0.0.0-20160322163242-5b898ac5add1 // indirect github.com/whyrusleeping/timecache v0.0.0-20160911033111-cfcb2f1abfee // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect diff --git a/fly/go.sum b/fly/go.sum index b6b981e7..9c481a33 100644 --- a/fly/go.sum +++ b/fly/go.sum @@ -56,6 +56,8 @@ github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRF github.com/alecthomas/units v0.0.0-20190717042225-c3de453c63f4/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/alecthomas/units v0.0.0-20190924025748-f65c72e2690d/go.mod h1:rBZYJk541a8SKzHPHnH3zbiI+7dagKZ0cgpgrD7Fyho= github.com/allegro/bigcache/v3 v3.1.0 h1:H2Vp8VOvxcrB91o86fUSVJFqeuz8kpyyB02eH3bSzwk= +github.com/andybalholm/brotli v1.0.4 h1:V7DdXeJtZscaqfNuAdSRuRFzuiKlHSC/Zh3zl9qY3JY= +github.com/andybalholm/brotli v1.0.4/go.mod h1:fO7iG3H7G2nSZ7m0zPUDn85XEX2GTukHGRSepvi9Eig= github.com/anmitsu/go-shlex v0.0.0-20161002113705-648efa622239/go.mod h1:2FmKhYUyUczH0OGQWaF5ceTx0UBShxjsH6f8oGKYe2c= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= @@ -202,6 +204,8 @@ github.com/godbus/dbus/v5 v5.0.3/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5x github.com/godbus/dbus/v5 v5.0.4/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= github.com/godbus/dbus/v5 v5.1.0 h1:4KLkAxT3aOY8Li4FRJe/KvhoNFFxo0m6fNuFUO8QJUk= github.com/godbus/dbus/v5 v5.1.0/go.mod h1:xhWf0FNVPg57R7Z0UbKHbJfkEywrmjJnf7w5xrFpKfA= +github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4= +github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20210429001901-424d2337a529/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v1.0.0 h1:nfP3RFugxnNRyKgeWd4oI1nYvXpxrx8ck8ZrcizshdQ= @@ -382,6 +386,7 @@ github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+o github.com/kkdai/bstream v0.0.0-20161212061736-f391b8402d23/go.mod h1:J+Gs4SYgM6CZQHDETBtE9HaSEkGmuNXF86RwHhHUvq4= github.com/klauspost/compress v1.12.3/go.mod h1:8dP1Hq4DHOhN9w426knH3Rhby4rFm6D8eO+e+Dq5Gzg= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.15.9/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHUDtV4Yw2GlzU= github.com/klauspost/compress v1.15.11 h1:Lcadnb3RKGin4FYM/orgq0qde+nc15E5Cbqg4B9Sx9c= github.com/klauspost/compress v1.15.11/go.mod h1:QPwzmACJjUTFsnSHH934V6woptycfrDDJnH7hvFVbGM= github.com/klauspost/cpuid/v2 v2.0.4/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= @@ -469,12 +474,16 @@ github.com/marten-seemann/qtls-go1-19 v0.1.0/go.mod h1:5HTDWtVudo/WFsHKRNuOhWlbd github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU= github.com/mattn/go-colorable v0.1.1/go.mod h1:FuOcm+DKB9mbwrcAfNl7/TZVBZ6rcnceauSikq3lYCQ= +github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA= +github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg= github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ= github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM= github.com/mattn/go-pointer v0.0.1 h1:n+XhsuGeVO6MEAp7xyEukFINEa+Quek5psIR/ylA6o0= github.com/mattn/go-pointer v0.0.1/go.mod h1:2zXcozF6qYGgmsG+SeTZz3oAbFLdD3OWqnUbNvJZAlc= +github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU= +github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.4 h1:mmDVorXM7PCGKw94cs5zkfA9PSy5pEvNWRP0ET0TIVo= github.com/matttproud/golang_protobuf_extensions v1.0.4/go.mod h1:BSXmuO+STAnVfrANrmjBb36TMTDstsz7MSK+HVaYKv4= @@ -630,6 +639,8 @@ github.com/raulk/go-watchdog v1.3.0 h1:oUmdlHxdkXRJlwfG0O9omj8ukerm8MEQavSiDTEtB github.com/raulk/go-watchdog v1.3.0/go.mod h1:fIvOnLbF0b0ZwkB9YU4mOW9Did//4vPZtDqv66NfsMU= github.com/regen-network/protobuf v1.3.3-alpha.regen.1 h1:OHEc+q5iIAXpqiqFKeLpu5NwTIkVXUs48vFMwzqpqY4= github.com/regen-network/protobuf v1.3.3-alpha.regen.1/go.mod h1:2DjTFR1HhMQhiWC5sZ4OhQ3+NtdbZ6oBDKQwq5Ou+FI= +github.com/rivo/uniseg v0.2.0 h1:S1pD9weZBuJdFmowNwbpi7BJ8TNftyUImj/0WQi72jY= +github.com/rivo/uniseg v0.2.0/go.mod h1:J6wj4VEh+S6ZtnVlnTBMWIodfgj8LQOQFoIToxlJtxc= github.com/rogpeppe/fastuuid v1.2.0/go.mod h1:jVj6XXZzXRy/MSR5jhDC/2q6DgLz+nrA6LYCDYWNEvQ= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.6.1 h1:/FiVV8dS/e+YqF2JvO3yXRFbBLTIuSDkuC7aBOAvL+k= @@ -707,6 +718,12 @@ github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhV github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= +github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY= +github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY= +github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8= +github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= github.com/warpfork/go-wish v0.0.0-20200122115046-b9ea61034e4a h1:G++j5e0OC488te356JvdhaM8YS6nMsjLAYF7JxCv07w= @@ -782,6 +799,7 @@ golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPh golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU= golang.org/x/crypto v0.1.0/go.mod h1:RecgLatLF4+eUMCP1PoPZQb+cVrJcOPbHkTkbkB9sbw= @@ -875,6 +893,7 @@ golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk= golang.org/x/net v0.0.0-20220624214902-1bab6f366d9e/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c= +golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk= golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco= golang.org/x/net v0.2.0 h1:sZfSu1wtKLGlWI4ZZayP0ck9Y73K1ynO6gqzTdBVdPU= golang.org/x/net v0.2.0/go.mod h1:KqCZLdyyvdV855qA2rE3GC2aiw5xGR5TEjj8smXukLY= @@ -973,6 +992,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220704084225-05e143d24a9e/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20221010170243-090e33056c14/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/fly/internal/sqs/sqs_consumer.go b/fly/internal/sqs/sqs_consumer.go index 2086d0b1..acb7adbf 100644 --- a/fly/internal/sqs/sqs_consumer.go +++ b/fly/internal/sqs/sqs_consumer.go @@ -97,3 +97,14 @@ func (c *Consumer) DeleteMessage(msg *aws_sqs.Message) error { func (c *Consumer) GetVisibilityTimeout() time.Duration { return time.Duration(*c.visibilityTimeout * int64(time.Second)) } + +// GetQueueAttributes get queue attributes. +func (c *Consumer) GetQueueAttributes() (*aws_sqs.GetQueueAttributesOutput, error) { + params := &aws_sqs.GetQueueAttributesInput{ + QueueUrl: aws.String(c.url), + AttributeNames: []*string{ + aws.String("CreatedTimestamp"), + }, + } + return c.api.GetQueueAttributes(params) +} diff --git a/fly/main.go b/fly/main.go index 4d68ec33..330f4b03 100644 --- a/fly/main.go +++ b/fly/main.go @@ -16,6 +16,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/fly/migration" "github.com/wormhole-foundation/wormhole-explorer/fly/processor" "github.com/wormhole-foundation/wormhole-explorer/fly/queue" + "github.com/wormhole-foundation/wormhole-explorer/fly/server" "github.com/wormhole-foundation/wormhole-explorer/fly/storage" "github.com/certusone/wormhole/node/pkg/common" @@ -124,10 +125,10 @@ func newCache() (cache.CacheInterface[bool], error) { // Creates two callbacks depending on whether the execution is local (memory queue) or not (SQS queue) // callback to obtain queue messages from a queue // callback to publish vaa non pyth messages to a sink -func newVAAConsumePublish(isLocal bool, logger *zap.Logger) (processor.VAAQueueConsumeFunc, processor.VAAPushFunc) { +func newVAAConsumePublish(isLocal bool, logger *zap.Logger) (*sqs.Consumer, processor.VAAQueueConsumeFunc, processor.VAAPushFunc) { if isLocal { vaaQueue := queue.NewVAAInMemory() - return vaaQueue.Consume, vaaQueue.Publish + return nil, vaaQueue.Consume, vaaQueue.Publish } sqsProducer, err := newSQSProducer() if err != nil { @@ -140,11 +141,12 @@ func newVAAConsumePublish(isLocal bool, logger *zap.Logger) (processor.VAAQueueC } vaaQueue := queue.NewVAASQS(sqsProducer, sqsConsumer, logger) - return vaaQueue.Consume, vaaQueue.Publish + return sqsConsumer, vaaQueue.Consume, vaaQueue.Publish } func main() { // Node's main lifecycle context. + rootCtx, rootCtxCancel = context.WithCancel(context.Background()) defer rootCtxCancel() // main @@ -262,7 +264,7 @@ func main() { // Creates a deduplicator to discard VAA messages that were processed previously deduplicator := deduplicator.New(cache, logger) // Creates two callbacks - vaaQueueConsume, nonPythVaaPublish := newVAAConsumePublish(isLocal != nil && *isLocal, logger) + sqsConsumer, vaaQueueConsume, nonPythVaaPublish := newVAAConsumePublish(isLocal != nil && *isLocal, logger) // Creates a instance to consume VAA messages from Gossip network and handle the messages // When recive a message, the message filter by deduplicator // if VAA is from pyhnet should be saved directly to repository @@ -276,6 +278,10 @@ func main() { vaaQueueConsumer.Start(rootCtx) vaaGossipConsumerSplitter.Start(rootCtx) + // start fly http server. + server := server.NewServer(logger, repository, sqsConsumer, *isLocal) + server.Start() + go func() { for { select { @@ -365,7 +371,7 @@ func main() { <-rootCtx.Done() // TODO: wait for things to shut down gracefully vaaGossipConsumerSplitter.Close() - + server.Stop() } func verifyObservation(logger *zap.Logger, obs *gossipv1.SignedObservation, gs *common.GuardianSet) bool { diff --git a/fly/migration/migration.go b/fly/migration/migration.go index 83fc08fd..a0bc0f2a 100644 --- a/fly/migration/migration.go +++ b/fly/migration/migration.go @@ -4,19 +4,138 @@ import ( "context" "errors" + "go.mongodb.org/mongo-driver/bson" "go.mongodb.org/mongo-driver/mongo" "go.mongodb.org/mongo-driver/mongo/options" ) +// TODO: move this to migration tool that support mongodb. func Run(db *mongo.Database) error { - // TODO: change this to use a migration tool. + // Created governorConfig collection. + err := db.CreateCollection(context.TODO(), "governorConfig") + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // Created governorStatus collection. + err = db.CreateCollection(context.TODO(), "governorStatus") + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // Created heartbeats collection. + err = db.CreateCollection(context.TODO(), "heartbeats") + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // Created observations collection. + err = db.CreateCollection(context.TODO(), "observations") + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // Created vaaCounts collection. + err = db.CreateCollection(context.TODO(), "vaaCounts") + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // Create vaas collection. + err = db.CreateCollection(context.TODO(), "vaas") + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // Create vassPythnet capped collection. isCapped := true var sizeCollection, maxDocuments int64 = 50 * 1024 * 1024, 10000 collectionOptions := options.CreateCollectionOptions{ Capped: &isCapped, SizeInBytes: &sizeCollection, MaxDocuments: &maxDocuments} - err := db.CreateCollection(context.TODO(), "vaasPythnet", &collectionOptions) + err = db.CreateCollection(context.TODO(), "vaasPythnet", &collectionOptions) + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // create index in vaas collection by vaa key (emitterchain, emitterAddr, sequence) + indexVaaByKey := mongo.IndexModel{ + Keys: bson.D{ + {Key: "timestamp", Value: -1}, + {Key: "emitterAddr", Value: 1}, + {Key: "emitterChain", Value: 1}, + }} + _, err = db.Collection("vaas").Indexes().CreateOne(context.TODO(), indexVaaByKey) + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + indexVaaByTimestamp := mongo.IndexModel{ + Keys: bson.D{ + {Key: "emitterChain", Value: 1}, + {Key: "emitterAddr", Value: 1}, + {Key: "sequence", Value: 1}, + }} + _, err = db.Collection("vaas").Indexes().CreateOne(context.TODO(), indexVaaByTimestamp) + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // create index in observations collection by indexedAt. + indexObservationsByIndexedAt := mongo.IndexModel{Keys: bson.D{{Key: "indexedAt", Value: 1}}} + _, err = db.Collection("observations").Indexes().CreateOne(context.TODO(), indexObservationsByIndexedAt) + if err != nil { + target := &mongo.CommandError{} + isCommandError := errors.As(err, target) + if !isCommandError || err.(mongo.CommandError).Code != 48 { + return err + } + } + + // create index in observations collect. + indexObservationsByEmitterChainAndAddressAndSequence := mongo.IndexModel{ + Keys: bson.D{ + {Key: "emitterChain", Value: 1}, + {Key: "emitterAddr", Value: 1}, + {Key: "sequence", Value: 1}}} + _, err = db.Collection("observations").Indexes().CreateOne(context.TODO(), indexObservationsByEmitterChainAndAddressAndSequence) if err != nil { target := &mongo.CommandError{} isCommandError := errors.As(err, target) diff --git a/fly/server/controller.go b/fly/server/controller.go new file mode 100644 index 00000000..33c4111d --- /dev/null +++ b/fly/server/controller.go @@ -0,0 +1,89 @@ +package server + +import ( + "context" + + "github.com/gofiber/fiber/v2" + "github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs" + "github.com/wormhole-foundation/wormhole-explorer/fly/storage" +) + +// Controller definition. +type Controller struct { + repository *storage.Repository + consumer *sqs.Consumer + isLocal bool +} + +// NewController creates a Controller instance. +func NewController(repo *storage.Repository, consumer *sqs.Consumer, isLocal bool) *Controller { + return &Controller{repository: repo, consumer: consumer, isLocal: isLocal} +} + +// HealthCheck handler for the endpoint /health. +func (c *Controller) HealthCheck(ctx *fiber.Ctx) error { + return ctx.JSON(struct { + Status string `json:"status"` + }{Status: "OK"}) +} + +// ReadyCheck handler for the endpoint /ready +func (c *Controller) ReadyCheck(ctx *fiber.Ctx) error { + // check mongo db is ready. + mongoStatus := c.checkMongoStatus(ctx.Context()) + if !mongoStatus { + return ctx.Status(fiber.StatusInternalServerError).JSON(struct { + Ready string `json:"ready"` + }{Ready: "NO"}) + } + // check aws SQS is ready. + queueStatus := c.checkQueueStatus(ctx.Context()) + if !queueStatus { + return ctx.Status(fiber.StatusInternalServerError).JSON(struct { + Ready string `json:"ready"` + }{Ready: "NO"}) + } + + // return success response. + return ctx.Status(fiber.StatusOK).JSON(struct { + Ready string `json:"ready"` + }{Ready: "OK"}) +} + +func (c *Controller) checkMongoStatus(ctx context.Context) bool { + mongoStatus, err := c.repository.GetMongoStatus(ctx) + if err != nil { + return false + } + + // check mongo server status + mongoStatusCheck := (mongoStatus.Ok == 1 && mongoStatus.Pid > 0 && mongoStatus.Uptime > 0) + if !mongoStatusCheck { + return false + } + + // check mongo connections + if mongoStatus.Connections.Available <= 0 { + return false + } + return true +} + +func (c *Controller) checkQueueStatus(ctx context.Context) bool { + // vaa queue handle in memory [local enviroment] + if c.isLocal { + return true + } + // get queue attributes + queueAttributes, err := c.consumer.GetQueueAttributes() + if err != nil || queueAttributes == nil { + return false + } + + // check queue created + createdTimestamp := queueAttributes.Attributes["CreatedTimestamp"] + if createdTimestamp == nil { + return false + } + return *createdTimestamp != "" +} diff --git a/fly/server/server.go b/fly/server/server.go new file mode 100644 index 00000000..5d5dfec6 --- /dev/null +++ b/fly/server/server.go @@ -0,0 +1,49 @@ +package server + +import ( + "os" + + "github.com/gofiber/fiber/v2" + fiberLog "github.com/gofiber/fiber/v2/middleware/logger" + "github.com/wormhole-foundation/wormhole-explorer/fly/internal/sqs" + "github.com/wormhole-foundation/wormhole-explorer/fly/storage" + "go.uber.org/zap" +) + +type Server struct { + app *fiber.App + port string + logger *zap.Logger +} + +func NewServer(logger *zap.Logger, repository *storage.Repository, consumer *sqs.Consumer, isLocal bool) *Server { + port := os.Getenv("API_PORT") + if port == "" { + logger.Fatal("You must set your 'API_PORT' environmental variable") + } + ctrl := NewController(repository, consumer, isLocal) + app := fiber.New() + app.Use(fiberLog.New(fiberLog.Config{ + Format: "level=info timestamp=${time} method=${method} path=${path} status${status} request_id=${locals:requestid}\n", + })) + api := app.Group("/api") + api.Get("/health", ctrl.HealthCheck) + api.Get("/ready", ctrl.ReadyCheck) + return &Server{ + app: app, + port: port, + logger: logger, + } +} + +// Start listen serves HTTP requests from addr. +func (s *Server) Start() { + go func() { + s.app.Listen(":" + s.port) + }() +} + +// Stop gracefull server. +func (s *Server) Stop() { + _ = s.app.Shutdown() +} diff --git a/fly/storage/documents.go b/fly/storage/documents.go index f4fe7d21..762df0ef 100644 --- a/fly/storage/documents.go +++ b/fly/storage/documents.go @@ -39,3 +39,21 @@ func indexedAt(t time.Time) IndexingTimestamps { IndexedAt: t, } } + +// MongoStatus represent a mongo server status. +type MongoStatus struct { + Ok int32 `bson:"ok"` + Host string `bson:"host"` + Version string `bson:"version"` + Process string `bson:"process"` + Pid int32 `bson:"pid"` + Uptime int32 `bson:"uptime"` + Connections *MongoConnections `bson:"connections"` +} + +// MongoConnections represents a mongo server connection. +type MongoConnections struct { + Current int32 `bson:"current"` + Available int32 `bson:"available"` + TotalCreated int32 `bson:"totalCreated"` +} diff --git a/fly/storage/repository.go b/fly/storage/repository.go index 3f0bfa2e..bbef8839 100644 --- a/fly/storage/repository.go +++ b/fly/storage/repository.go @@ -178,3 +178,19 @@ func (s *Repository) updateVAACount(chainID vaa.ChainID) { func (s *Repository) isNewRecord(result *mongo.UpdateResult) bool { return result.MatchedCount == 0 && result.ModifiedCount == 0 && result.UpsertedCount == 1 } + +// GetMongoStatus get mongo server status +func (r *Repository) GetMongoStatus(ctx context.Context) (*MongoStatus, error) { + command := bson.D{{Key: "serverStatus", Value: 1}} + result := r.db.RunCommand(ctx, command) + if result.Err() != nil { + return nil, result.Err() + } + + var mongoStatus MongoStatus + err := result.Decode(&mongoStatus) + if err != nil { + return nil, err + } + return &mongoStatus, nil +}