diff --git a/api/docs/docs.go b/api/docs/docs.go index 26e6af4f..bc020c48 100644 --- a/api/docs/docs.go +++ b/api/docs/docs.go @@ -979,6 +979,35 @@ const docTemplate = `{ } } }, + "/api/v1/protocols/stats": { + "get": { + "description": "Returns the representative stats for the top protocols", + "tags": [ + "wormholescan" + ], + "operationId": "get-top-protocols-stats", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/protocols.ProtocolTotalValuesDTO" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/protocols.ProtocolTotalValuesDTO" + } + } + } + } + } + }, "/api/v1/ready": { "get": { "description": "Ready check", @@ -2760,6 +2789,35 @@ const docTemplate = `{ } } }, + "protocols.ProtocolTotalValuesDTO": { + "type": "object", + "properties": { + "error": { + "type": "string" + }, + "last_day_diff_percentage": { + "type": "string" + }, + "last_day_messages": { + "type": "integer" + }, + "protocol": { + "type": "string" + }, + "total_messages": { + "type": "integer" + }, + "total_value_locked": { + "type": "number" + }, + "total_value_secured": { + "type": "number" + }, + "total_value_transferred": { + "type": "number" + } + } + }, "relays.DeliveryReponse": { "type": "object", "properties": { diff --git a/api/docs/swagger.json b/api/docs/swagger.json index 749936cf..3f5a7a06 100644 --- a/api/docs/swagger.json +++ b/api/docs/swagger.json @@ -972,6 +972,35 @@ } } }, + "/api/v1/protocols/stats": { + "get": { + "description": "Returns the representative stats for the top protocols", + "tags": [ + "wormholescan" + ], + "operationId": "get-top-protocols-stats", + "responses": { + "200": { + "description": "OK", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/protocols.ProtocolTotalValuesDTO" + } + } + }, + "500": { + "description": "Internal Server Error", + "schema": { + "type": "array", + "items": { + "$ref": "#/definitions/protocols.ProtocolTotalValuesDTO" + } + } + } + } + } + }, "/api/v1/ready": { "get": { "description": "Ready check", @@ -2753,6 +2782,35 @@ } } }, + "protocols.ProtocolTotalValuesDTO": { + "type": "object", + "properties": { + "error": { + "type": "string" + }, + "last_day_diff_percentage": { + "type": "string" + }, + "last_day_messages": { + "type": "integer" + }, + "protocol": { + "type": "string" + }, + "total_messages": { + "type": "integer" + }, + "total_value_locked": { + "type": "number" + }, + "total_value_secured": { + "type": "number" + }, + "total_value_transferred": { + "type": "number" + } + } + }, "relays.DeliveryReponse": { "type": "object", "properties": { diff --git a/api/docs/swagger.yaml b/api/docs/swagger.yaml index f3cca9ee..adcc63b2 100644 --- a/api/docs/swagger.yaml +++ b/api/docs/swagger.yaml @@ -522,6 +522,25 @@ definitions: tokenChain: $ref: '#/definitions/vaa.ChainID' type: object + protocols.ProtocolTotalValuesDTO: + properties: + error: + type: string + last_day_diff_percentage: + type: string + last_day_messages: + type: integer + protocol: + type: string + total_messages: + type: integer + total_value_locked: + type: number + total_value_secured: + type: number + total_value_transferred: + type: number + type: object relays.DeliveryReponse: properties: budget: @@ -1774,6 +1793,25 @@ paths: description: Internal Server Error tags: - wormholescan + /api/v1/protocols/stats: + get: + description: Returns the representative stats for the top protocols + operationId: get-top-protocols-stats + responses: + "200": + description: OK + schema: + items: + $ref: '#/definitions/protocols.ProtocolTotalValuesDTO' + type: array + "500": + description: Internal Server Error + schema: + items: + $ref: '#/definitions/protocols.ProtocolTotalValuesDTO' + type: array + tags: + - wormholescan /api/v1/ready: get: description: Ready check diff --git a/api/handlers/protocols/repository.go b/api/handlers/protocols/repository.go new file mode 100644 index 00000000..235558c1 --- /dev/null +++ b/api/handlers/protocols/repository.go @@ -0,0 +1,147 @@ +package protocols + +import ( + "context" + "fmt" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/query" + "github.com/mitchellh/mapstructure" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "go.uber.org/zap" +) + +const QueryTemplateLatestPoint = ` +from(bucket: "%s") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") + |> last() + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + +const QueryTemplateLast24Point = ` +from(bucket: "%s") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") + |> first() + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + +const QueryTemplateActivityLatestPoint = ` +from(bucket: "%s") + |> range(start: -1d) + |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") + |> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"]) + |> last() + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") +` + +type Repository struct { + queryAPI QueryDoer + logger *zap.Logger + statsBucket string + activityBucket string + statsVersion string + activityVersion string +} + +type rowStat struct { + Protocol string `mapstructure:"protocol"` + TotalMessages uint64 `mapstructure:"total_messages"` + TotalValueLocked float64 `mapstructure:"total_value_locked"` +} + +type rowActivity struct { + Protocol string `mapstructure:"protocol"` + DestinationChainId string `mapstructure:"destination_chain_id"` + EmitterChainId string `mapstructure:"emitter_chain_id"` + From string `mapstructure:"from"` + TotalUsd float64 `mapstructure:"total_usd"` + TotalValueTransferred float64 `mapstructure:"total_value_transferred"` + TotalVolumeSecure float64 `mapstructure:"total_value_secure"` + Txs uint64 `mapstructure:"txs"` +} + +type stats struct { + Latest rowStat + Last24 rowStat +} + +type QueryDoer interface { + Query(ctx context.Context, query string) (QueryResult, error) +} + +type queryApiWrapper struct { + qApi api.QueryAPI +} + +type QueryResult interface { + Next() bool + Record() *query.FluxRecord + Err() error + Close() error +} + +func WrapQueryAPI(qApi api.QueryAPI) QueryDoer { + return &queryApiWrapper{qApi: qApi} +} + +func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository { + return &Repository{ + queryAPI: qApi, + statsBucket: statsBucket, + activityBucket: activityBucket, + statsVersion: statsVersion, + activityVersion: activityVersion, + logger: logger, + } +} + +func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult, error) { + return q.qApi.Query(ctx, query) +} + +// returns latest and last 24 hr stats for a given protocol +func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) { + // fetch latest stat + latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion) + if err != nil { + return stats{}, err + } + // fetch last 24 hr stat + last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion) + return stats{ + Latest: latest, + Last24: last24hr, + }, err +} + +func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) { + return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion) +} + +func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) { + var res T + q := buildQuery(queryTemplate, bucket, measurement, contributor, version) + result, err := queryAPI.Query(ctx, q) + if err != nil { + logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q)) + return res, err + } + defer result.Close() + + if !result.Next() { + if result.Err() != nil { + logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q)) + return res, result.Err() + } + logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q)) + return res, err + } + + err = mapstructure.Decode(result.Record().Values(), &res) + return res, err +} + +func buildQuery(queryTemplate, bucket, measurement, contributorName, version string) string { + return fmt.Sprintf(queryTemplate, bucket, measurement, contributorName, version) +} diff --git a/api/handlers/protocols/service.go b/api/handlers/protocols/service.go new file mode 100644 index 00000000..45bfbe3a --- /dev/null +++ b/api/handlers/protocols/service.go @@ -0,0 +1,99 @@ +package protocols + +import ( + "context" + "go.uber.org/zap" + "strconv" + "sync" +) + +type Service struct { + Protocols []string + repo *Repository + logger *zap.Logger +} + +type ProtocolTotalValuesDTO struct { + Protocol string `json:"protocol"` + TotalMessages uint64 `json:"total_messages"` + TotalValueLocked float64 `json:"total_value_locked,omitempty"` + TotalValueSecured float64 `json:"total_value_secured,omitempty"` + TotalValueTransferred float64 `json:"total_value_transferred,omitempty"` + LastDayMessages uint64 `json:"last_day_messages,omitempty"` + LastDayDiffPercentage string `json:"last_day_diff_percentage,omitempty"` + Error string `json:"error,omitempty"` +} + +func NewService(protocols []string, repo *Repository, logger *zap.Logger) *Service { + return &Service{ + Protocols: protocols, + repo: repo, + logger: logger, + } +} + +func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO { + + wg := &sync.WaitGroup{} + wg.Add(len(s.Protocols)) + results := make(chan ProtocolTotalValuesDTO, len(s.Protocols)) + + for i := range s.Protocols { + go s.getProtocolTotalValues(ctx, wg, s.Protocols[i], results) + } + wg.Wait() + close(results) + + resultsSlice := make([]ProtocolTotalValuesDTO, 0, len(s.Protocols)) + for r := range results { + resultsSlice = append(resultsSlice, r) + } + return resultsSlice +} + +func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, contributor string, results chan<- ProtocolTotalValuesDTO) { + defer wg.Done() + + type statsResult struct { + result stats + Err error + } + statsRes := make(chan statsResult, 1) + go func() { + rowStats, errStats := s.repo.getProtocolStats(ctx, contributor) + statsRes <- statsResult{result: rowStats, Err: errStats} + close(statsRes) + }() + + activity, err := s.repo.getProtocolActivity(ctx, contributor) + if err != nil { + s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", contributor)) + results <- ProtocolTotalValuesDTO{Protocol: contributor, Error: err.Error()} + return + } + + rStats := <-statsRes + if rStats.Err != nil { + s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", contributor)) + results <- ProtocolTotalValuesDTO{Protocol: contributor, Error: rStats.Err.Error()} + return + } + + dto := ProtocolTotalValuesDTO{ + Protocol: contributor, + TotalValueLocked: rStats.result.Latest.TotalValueLocked, + TotalMessages: rStats.result.Latest.TotalMessages, + TotalValueTransferred: activity.TotalValueTransferred, + TotalValueSecured: activity.TotalVolumeSecure, + } + + totalMsgNow := rStats.result.Latest.TotalMessages + totalMessagesAsFromLast24hr := rStats.result.Last24.TotalMessages + if totalMessagesAsFromLast24hr != 0 { + last24HrMessages := totalMsgNow - totalMessagesAsFromLast24hr + dto.LastDayMessages = last24HrMessages + dto.LastDayDiffPercentage = strconv.FormatFloat(float64(last24HrMessages)/float64(totalMessagesAsFromLast24hr)*100, 'f', 2, 64) + "%" + } + + results <- dto +} diff --git a/api/handlers/protocols/service_test.go b/api/handlers/protocols/service_test.go new file mode 100644 index 00000000..4ab43039 --- /dev/null +++ b/api/handlers/protocols/service_test.go @@ -0,0 +1,186 @@ +package protocols_test + +import ( + "context" + "errors" + "fmt" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/query" + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/mock" + "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "go.uber.org/zap" + "testing" +) + +func TestService_GetProtocolsTotalValues(t *testing.T) { + var errNil error + respStatsLatest := &mockQueryTableResult{} + respStatsLatest.On("Next").Return(true) + respStatsLatest.On("Err").Return(errNil) + respStatsLatest.On("Close").Return(errNil) + respStatsLatest.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(7), + "total_value_locked": float64(5), + })) + + respStatsLastDay := &mockQueryTableResult{} + respStatsLastDay.On("Next").Return(true) + respStatsLastDay.On("Err").Return(errNil) + respStatsLastDay.On("Close").Return(errNil) + respStatsLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_locked": float64(5), + })) + + respActivityLast := &mockQueryTableResult{} + respActivityLast.On("Next").Return(true) + respActivityLast.On("Err").Return(errNil) + respActivityLast.On("Close").Return(errNil) + respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_transferred": float64(7), + "total_value_secure": float64(9), + })) + + ctx := context.Background() + queryAPI := &mockQueryAPI{} + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) + + repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop()) + + values := service.GetProtocolsTotalValues(ctx) + assert.Equal(t, 1, len(values)) + assert.Equal(t, "protocol1", values[0].Protocol) + assert.Equal(t, 5.00, values[0].TotalValueLocked) + assert.Equal(t, uint64(7), values[0].TotalMessages) + assert.Equal(t, 9.00, values[0].TotalValueSecured) + assert.Equal(t, 7.00, values[0].TotalValueTransferred) + assert.Equal(t, uint64(3), values[0].LastDayMessages) + assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage) + +} + +func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) { + var errNil error + respStatsLatest := &mockQueryTableResult{} + respStatsLatest.On("Next").Return(true) + respStatsLatest.On("Err").Return(errNil) + respStatsLatest.On("Close").Return(errNil) + respStatsLatest.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(7), + "total_value_locked": float64(5), + })) + + respStatsLastDay := &mockQueryTableResult{} + respStatsLastDay.On("Next").Return(true) + respStatsLastDay.On("Err").Return(errNil) + respStatsLastDay.On("Close").Return(errNil) + respStatsLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_locked": float64(5), + })) + + ctx := context.Background() + queryAPI := &mockQueryAPI{} + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + queryAPI.On("Query", ctx, activityQuery).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_activity_error")) + + repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop()) + + values := service.GetProtocolsTotalValues(ctx) + assert.Equal(t, 1, len(values)) + assert.Equal(t, "protocol1", values[0].Protocol) + assert.NotNil(t, values[0].Error) + assert.Equal(t, "mocked_fetching_activity_error", values[0].Error) +} + +func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { + var errNil error + + respStatsLastDay := &mockQueryTableResult{} + respStatsLastDay.On("Next").Return(true) + respStatsLastDay.On("Err").Return(errNil) + respStatsLastDay.On("Close").Return(errNil) + respStatsLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_locked": float64(5), + })) + + respActivityLast := &mockQueryTableResult{} + respActivityLast.On("Next").Return(true) + respActivityLast.On("Err").Return(errNil) + respActivityLast.On("Close").Return(errNil) + respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "protocol": "protocol1", + "total_messages": uint64(4), + "total_value_transferred": float64(7), + "total_volume_secure": float64(9), + })) + + ctx := context.Background() + queryAPI := &mockQueryAPI{} + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error")) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, errNil) + + repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop()) + + values := service.GetProtocolsTotalValues(ctx) + assert.Equal(t, 1, len(values)) + assert.Equal(t, "protocol1", values[0].Protocol) + assert.NotNil(t, values[0].Error) + assert.Equal(t, "mocked_fetching_stats_error", values[0].Error) +} + +type mockQueryAPI struct { + mock.Mock +} + +func (m *mockQueryAPI) Query(ctx context.Context, q string) (protocols.QueryResult, error) { + args := m.Called(ctx, q) + return args.Get(0).(protocols.QueryResult), args.Error(1) +} + +type mockQueryTableResult struct { + mock.Mock +} + +func (m *mockQueryTableResult) Next() bool { + args := m.Called() + return args.Bool(0) +} + +func (m *mockQueryTableResult) Record() *query.FluxRecord { + args := m.Called() + return args.Get(0).(*query.FluxRecord) +} + +func (m *mockQueryTableResult) Err() error { + args := m.Called() + return args.Error(0) +} + +func (m *mockQueryTableResult) Close() error { + args := m.Called() + return args.Error(0) +} diff --git a/api/internal/config/config.go b/api/internal/config/config.go index 3ce0c843..afc61483 100644 --- a/api/internal/config/config.go +++ b/api/internal/config/config.go @@ -67,6 +67,9 @@ type AppConfig struct { //Api Tokens Tokens string } + Protocols []string + ProtocolsStatsVersion string + ProtocolsActivityVersion string } // GetLogLevel get zapcore.Level define in the configuraion. diff --git a/api/main.go b/api/main.go index e5270293..50f8f38a 100644 --- a/api/main.go +++ b/api/main.go @@ -4,6 +4,7 @@ import ( "context" _ "embed" "fmt" + "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" "net/http" "os" "os/signal" @@ -158,6 +159,7 @@ func main() { relaysRepo := relays.NewRepository(db.Database, rootLogger) operationsRepo := operations.NewRepository(db.Database, rootLogger) statsRepo := stats.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket24Hours, rootLogger) + protocolsRepo := protocols.NewRepository(protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), cfg.Influx.Bucket30Days, cfg.Influx.Bucket30Days, cfg.ProtocolsStatsVersion, cfg.ProtocolsActivityVersion, rootLogger) // create token provider tokenProvider := domain.NewTokenProvider(cfg.P2pNetwork) @@ -177,6 +179,7 @@ func main() { relaysService := relays.NewService(relaysRepo, rootLogger) operationsService := operations.NewService(operationsRepo, rootLogger) statsService := stats.NewService(statsRepo, cache, expirationTime, metrics, rootLogger) + protocolsService := protocols.NewService(cfg.Protocols, protocolsRepo, rootLogger) // Set up a custom error handler response.SetEnableStackTrace(*cfg) @@ -218,7 +221,7 @@ func main() { // Set up route handlers app.Get("/swagger.json", GetSwagger) - wormscan.RegisterRoutes(app, rootLogger, addressService, vaaService, obsService, governorService, infrastructureService, transactionsService, relaysService, operationsService, statsService) + wormscan.RegisterRoutes(app, rootLogger, addressService, vaaService, obsService, governorService, infrastructureService, transactionsService, relaysService, operationsService, statsService, protocolsService) guardian.RegisterRoutes(cfg, app, rootLogger, vaaService, governorService, heartbeatsService) // Set up gRPC handlers diff --git a/api/routes/wormscan/protocols/controller.go b/api/routes/wormscan/protocols/controller.go new file mode 100644 index 00000000..fd6b9e27 --- /dev/null +++ b/api/routes/wormscan/protocols/controller.go @@ -0,0 +1,45 @@ +package protocols + +import ( + "context" + "github.com/gofiber/fiber/v2" + "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" + "go.uber.org/zap" +) + +type Controller struct { + srv service + logger *zap.Logger +} + +type service interface { + GetProtocolsTotalValues(ctx context.Context) []protocols.ProtocolTotalValuesDTO +} + +func NewController(logger *zap.Logger, service service) *Controller { + return &Controller{ + logger: logger.With(zap.String("module", "ContributorsController")), + srv: service, + } +} + +// GetProtocolsTotalValues godoc +// @Description Returns the representative stats for the top protocols +// @Tags wormholescan +// @ID get-top-protocols-stats +// @Success 200 {object} []protocols.ProtocolTotalValuesDTO +// @Failure 500 {object} []protocols.ProtocolTotalValuesDTO +// @Router /api/v1/protocols/stats [get] +func (c *Controller) GetProtocolsTotalValues(ctx *fiber.Ctx) error { + values := c.srv.GetProtocolsTotalValues(ctx.Context()) + allFailed := true + for i := range values { + allFailed = allFailed && len(values[i].Error) > 0 + } + + err := ctx.JSON(values) + if allFailed && len(values) > 0 { + return ctx.SendStatus(fiber.StatusInternalServerError) + } + return err +} diff --git a/api/routes/wormscan/protocols/controller_test.go b/api/routes/wormscan/protocols/controller_test.go new file mode 100644 index 00000000..f7976dba --- /dev/null +++ b/api/routes/wormscan/protocols/controller_test.go @@ -0,0 +1,68 @@ +package protocols_test + +import ( + "context" + "errors" + "github.com/gofiber/fiber/v2" + "github.com/stretchr/testify/assert" + "github.com/valyala/fasthttp" + contributorsHandlerPkg "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" + "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/protocols" + "go.uber.org/zap" + "net/http" + "testing" +) + +func TestGetContributorsTotalValues(t *testing.T) { + + app := fiber.New() + defer app.Shutdown() + c := app.AcquireCtx(&fasthttp.RequestCtx{}) + + input := []struct { + testName string + mockError string + expectedStatusCode int + expectedResponseBody string + }{ + { + testName: "succeed scenario", + mockError: "", + expectedStatusCode: http.StatusOK, + expectedResponseBody: "[{\"protocol\":\"protocol1\",\"total_messages\":0}]", + }, + { + testName: "fail scenario", + mockError: errors.New("mock_error").Error(), + expectedStatusCode: http.StatusInternalServerError, + expectedResponseBody: "[{\"protocol\":\"protocol1\",\"total_messages\":0,\"error\":\"mock_error\"}]", + }, + } + + for _, inputArgs := range input { + t.Run(inputArgs.testName, func(t *testing.T) { + + service := mockService(func(ctx context.Context) []contributorsHandlerPkg.ProtocolTotalValuesDTO { + return []contributorsHandlerPkg.ProtocolTotalValuesDTO{ + { + Protocol: "protocol1", + Error: inputArgs.mockError, + }, + } + }) + + controller := protocols.NewController(zap.NewNop(), service) + err := controller.GetProtocolsTotalValues(c) + assert.Nil(t, err) + assert.Equal(t, inputArgs.expectedStatusCode, c.Response().StatusCode()) + assert.Equal(t, inputArgs.expectedResponseBody, string(c.Response().Body())) + }) + } + +} + +type mockService func(ctx context.Context) []contributorsHandlerPkg.ProtocolTotalValuesDTO + +func (m mockService) GetProtocolsTotalValues(ctx context.Context) []contributorsHandlerPkg.ProtocolTotalValuesDTO { + return m(ctx) +} diff --git a/api/routes/wormscan/routes.go b/api/routes/wormscan/routes.go index 7ad0c0fd..24f337c1 100644 --- a/api/routes/wormscan/routes.go +++ b/api/routes/wormscan/routes.go @@ -11,6 +11,7 @@ import ( infrasvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/infrastructure" obssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations" opsvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/operations" + protocolssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" relayssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/relays" statssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/stats" trxsvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions" @@ -20,6 +21,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/infrastructure" "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/observations" "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/operations" + "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/protocols" "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/relays" "github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/stats" @@ -51,6 +53,7 @@ func RegisterRoutes( relaysService *relayssvc.Service, operationsService *opsvc.Service, statsService *statssvc.Service, + protocolsService *protocolssvc.Service, ) { // Set up controllers @@ -62,7 +65,8 @@ func RegisterRoutes( transactionCtrl := transactions.NewController(transactionsService, rootLogger) relaysCtrl := relays.NewController(relaysService, rootLogger) opsCtrl := operations.NewController(operationsService, rootLogger) - statsCrtl := stats.NewController(statsService, rootLogger) + statsCtrl := stats.NewController(statsService, rootLogger) + contributorsCtrl := protocols.NewController(rootLogger, protocolsService) // Set up route handlers api := app.Group("/api/v1") @@ -88,8 +92,9 @@ func RegisterRoutes( api.Get("/transactions/:chain/:emitter/:sequence", transactionCtrl.GetTransactionByID) // stats custom endpoints - api.Get("/top-symbols-by-volume", statsCrtl.GetTopSymbolsByVolume) - api.Get("/top-100-corridors", statsCrtl.GetTopCorridors) + api.Get("/top-symbols-by-volume", statsCtrl.GetTopSymbolsByVolume) + api.Get("/top-100-corridors", statsCtrl.GetTopCorridors) + api.Get("/protocols/stats", contributorsCtrl.GetProtocolsTotalValues) // operations resource operations := api.Group("/operations") diff --git a/common/dbconsts/consts.go b/common/dbconsts/consts.go new file mode 100644 index 00000000..5f687d0f --- /dev/null +++ b/common/dbconsts/consts.go @@ -0,0 +1,7 @@ +package dbconsts + +// influx-db constants +const ( + ProtocolsActivityMeasurement = "protocols_activity" + ProtocolsStatsMeasurement = "protocols_stats_v1" +) diff --git a/deploy/api/api-service.yaml b/deploy/api/api-service.yaml index 19139134..19530b40 100644 --- a/deploy/api/api-service.yaml +++ b/deploy/api/api-service.yaml @@ -74,7 +74,7 @@ spec: valueFrom: configMapKeyRef: name: config - key: redis-prefix + key: redis-prefix - name: WORMSCAN_DB_URL valueFrom: secretKeyRef: @@ -94,7 +94,7 @@ spec: valueFrom: configMapKeyRef: name: config - key: redis-prefix + key: redis-prefix - name: WORMSCAN_CACHE_ENABLED value: "true" - name: WORMSCAN_CACHE_TVLKEY @@ -139,6 +139,18 @@ spec: configMapKeyRef: name: config key: influxdb-bucket-24-hours + - name: WORMSCAN_PROTOCOLSSTATSVERSION + valueFrom: + configMapKeyRef: + name: config + key: protocols-stats-version + - name: WORMSCAN_PROTOCOLSACTIVITYVERSION + valueFrom: + configMapKeyRef: + name: config + key: protocols-activity-version + - name: WORMSCAN_PROTOCOLS + value: {{ .WORMSCAN_PROTOCOLS }} resources: limits: memory: {{ .RESOURCES_LIMITS_MEMORY }} diff --git a/deploy/api/env/production-mainnet.env b/deploy/api/env/production-mainnet.env index bfec296a..675fef72 100644 --- a/deploy/api/env/production-mainnet.env +++ b/deploy/api/env/production-mainnet.env @@ -20,3 +20,4 @@ WORMSCAN_RATELIMIT_MAX=1000 WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 WORMSCAN_VAAPAYLOADPARSER_ENABLED=true +WORMSCAN_PROTOCOLS= diff --git a/deploy/api/env/production-testnet.env b/deploy/api/env/production-testnet.env index ea22274c..56fa5a69 100644 --- a/deploy/api/env/production-testnet.env +++ b/deploy/api/env/production-testnet.env @@ -19,4 +19,5 @@ WORMSCAN_RATELIMIT_ENABLED=true WORMSCAN_RATELIMIT_MAX=100 WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 -WORMSCAN_VAAPAYLOADPARSER_ENABLED=true \ No newline at end of file +WORMSCAN_VAAPAYLOADPARSER_ENABLED=true +WORMSCAN_PROTOCOLS= \ No newline at end of file diff --git a/deploy/api/env/staging-mainnet.env b/deploy/api/env/staging-mainnet.env index 9fe767a5..b9f2a903 100644 --- a/deploy/api/env/staging-mainnet.env +++ b/deploy/api/env/staging-mainnet.env @@ -19,4 +19,5 @@ WORMSCAN_RATELIMIT_ENABLED=true WORMSCAN_RATELIMIT_MAX=100 WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 -WORMSCAN_VAAPAYLOADPARSER_ENABLED=true \ No newline at end of file +WORMSCAN_VAAPAYLOADPARSER_ENABLED=true +WORMSCAN_PROTOCOLS=allbridge,mayan \ No newline at end of file diff --git a/deploy/api/env/staging-testnet.env b/deploy/api/env/staging-testnet.env index 2b1b826b..ab8d2207 100644 --- a/deploy/api/env/staging-testnet.env +++ b/deploy/api/env/staging-testnet.env @@ -19,4 +19,5 @@ WORMSCAN_RATELIMIT_ENABLED=true WORMSCAN_RATELIMIT_MAX=100 WORMSCAN_VAAPAYLOADPARSER_URL= WORMSCAN_VAAPAYLOADPARSER_TIMEOUT=10 -WORMSCAN_VAAPAYLOADPARSER_ENABLED=true \ No newline at end of file +WORMSCAN_VAAPAYLOADPARSER_ENABLED=true +WORMSCAN_PROTOCOLS= \ No newline at end of file diff --git a/deploy/common/configmap.yaml b/deploy/common/configmap.yaml index a5fddcc9..41eba82d 100644 --- a/deploy/common/configmap.yaml +++ b/deploy/common/configmap.yaml @@ -13,3 +13,5 @@ data: influxdb-bucket-infinite: {{ .INFLUX_BUCKET_INFINITE }} influxdb-bucket-30-days: {{ .INFLUX_BUCKET_30_DAYS }} influxdb-bucket-24-hours: {{ .INFLUX_BUCKET_24_HOURS }} + protocols-activity-version: {{ .PROTOCOLS_ACTIVITY_VERSION }} + protocols-stats-version: {{ .PROTOCOLS_STATS_VERSION }} diff --git a/deploy/common/env/production-mainnet.env b/deploy/common/env/production-mainnet.env index 474b7087..efaaa536 100644 --- a/deploy/common/env/production-mainnet.env +++ b/deploy/common/env/production-mainnet.env @@ -29,4 +29,7 @@ POLYGON_URL= SEI_URL= SOLANA_URL= SUI_URL= -TERRA_URL= \ No newline at end of file +TERRA_URL= +# protocols jobs +PROTOCOLS_STATS_VERSION=v1 +PROTOCOLS_ACTIVITY_VERSION=v1 \ No newline at end of file diff --git a/deploy/common/env/production-testnet.env b/deploy/common/env/production-testnet.env index 956a9381..2194e804 100644 --- a/deploy/common/env/production-testnet.env +++ b/deploy/common/env/production-testnet.env @@ -33,4 +33,7 @@ POLYGON_URL= SEI_URL= SOLANA_URL= SUI_URL= -TERRA_URL= \ No newline at end of file +TERRA_URL= +# protocols jobs +PROTOCOLS_STATS_VERSION=v1 +PROTOCOLS_ACTIVITY_VERSION=v1 \ No newline at end of file diff --git a/deploy/common/env/staging-mainnet.env b/deploy/common/env/staging-mainnet.env index 085243fa..52ea3796 100644 --- a/deploy/common/env/staging-mainnet.env +++ b/deploy/common/env/staging-mainnet.env @@ -29,4 +29,7 @@ POLYGON_URL= SEI_URL= SOLANA_URL= SUI_URL= -TERRA_URL= \ No newline at end of file +TERRA_URL= +# protocols jobs +PROTOCOLS_STATS_VERSION=v1 +PROTOCOLS_ACTIVITY_VERSION=v1 \ No newline at end of file diff --git a/deploy/common/env/staging-testnet.env b/deploy/common/env/staging-testnet.env index b9912f4e..924e6f83 100644 --- a/deploy/common/env/staging-testnet.env +++ b/deploy/common/env/staging-testnet.env @@ -33,4 +33,7 @@ POLYGON_URL= SEI_URL= SOLANA_URL= SUI_URL= -TERRA_URL= \ No newline at end of file +TERRA_URL= +# protocols jobs +PROTOCOLS_STATS_VERSION=v1 +PROTOCOLS_ACTIVITY_VERSION=v1 \ No newline at end of file diff --git a/deploy/jobs/env/production-mainnet.env b/deploy/jobs/env/production-mainnet.env index 43ee2957..4adb60c3 100644 --- a/deploy/jobs/env/production-mainnet.env +++ b/deploy/jobs/env/production-mainnet.env @@ -21,4 +21,9 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * PRICES_URI=http://wormscan-notional.wormscan OUTPUT_PATH= #migrate vaa to origintx job -TX_TRACKER_URL= \ No newline at end of file +TX_TRACKER_URL= +#protocols stats job: every hour +PROTOCOLS_STATS_CRONTAB_SCHEDULE='0 * * * *' +#protocols activity job:every hour +PROTOCOLS_ACTIVITY_CRONTAB_SCHEDULE='0 * * * *' +PROTOCOLS_JSON= diff --git a/deploy/jobs/env/production-testnet.env b/deploy/jobs/env/production-testnet.env index b30ae399..7a2254de 100644 --- a/deploy/jobs/env/production-testnet.env +++ b/deploy/jobs/env/production-testnet.env @@ -21,4 +21,9 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * PRICES_URI=http://wormscan-notional.wormscan-testnet OUTPUT_PATH= #migrate vaa to origintx job -TX_TRACKER_URL= \ No newline at end of file +TX_TRACKER_URL= +#protocols stats job: every hour +PROTOCOLS_STATS_CRONTAB_SCHEDULE='0 * * * *' +#protocols activity job:every hour +PROTOCOLS_ACTIVITY_CRONTAB_SCHEDULE='0 * * * *' +PROTOCOLS_JSON= diff --git a/deploy/jobs/env/staging-mainnet.env b/deploy/jobs/env/staging-mainnet.env index 633eb6a6..7457303b 100644 --- a/deploy/jobs/env/staging-mainnet.env +++ b/deploy/jobs/env/staging-mainnet.env @@ -21,4 +21,9 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * PRICES_URI=http://wormscan-notional.wormscan OUTPUT_PATH= #migrate vaa to origintx job -TX_TRACKER_URL= \ No newline at end of file +TX_TRACKER_URL= +#protocols stats job: every hour +PROTOCOLS_STATS_CRONTAB_SCHEDULE=0 * * * * +#protocols activity job:every hour +PROTOCOLS_ACTIVITY_CRONTAB_SCHEDULE=0 * * * * +PROTOCOLS_JSON= diff --git a/deploy/jobs/env/staging-testnet.env b/deploy/jobs/env/staging-testnet.env index 67ba5095..dd31ba76 100644 --- a/deploy/jobs/env/staging-testnet.env +++ b/deploy/jobs/env/staging-testnet.env @@ -21,4 +21,9 @@ HISTORICAL_PRICES_CRONTAB_SCHEDULE=0 1 * * * PRICES_URI=http://wormscan-notional.wormscan-testnet OUTPUT_PATH= #migrate vaas to origintx job -TX_TRACKER_URL= \ No newline at end of file +TX_TRACKER_URL= +#protocols stats job: every hour +PROTOCOLS_STATS_CRONTAB_SCHEDULE='0 * * * *' +#protocols activity job:every hour +PROTOCOLS_ACTIVITY_CRONTAB_SCHEDULE='0 * * * *' +PROTOCOLS_JSON= diff --git a/deploy/jobs/protocols-activity.yaml b/deploy/jobs/protocols-activity.yaml new file mode 100644 index 00000000..df3a3efd --- /dev/null +++ b/deploy/jobs/protocols-activity.yaml @@ -0,0 +1,50 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: protocols-activity-1h + namespace: {{ .NAMESPACE }} +spec: #cronjob specs + schedule: {{ .PROTOCOLS_ACTIVITY_CRONTAB_SCHEDULE }} + jobTemplate: + spec: # job specs + template: + spec: # pod specs + containers: + - name: protocols-activity-1h + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_PROTOCOLS_ACTIVITY + - name: INFLUX_URL + valueFrom: + configMapKeyRef: + name: config + key: influxdb-url + - name: INFLUX_TOKEN + valueFrom: + secretKeyRef: + name: influxdb + key: token + - name: INFLUX_ORGANIZATION + valueFrom: + configMapKeyRef: + name: config + key: influxdb-organization + - name: INFLUX_BUCKET_30_DAYS + valueFrom: + configMapKeyRef: + name: config + key: influxdb-bucket-30-days + - name: ACTIVITY_VERSION + valueFrom: + configMapKeyRef: + name: config + key: protocols-activity-version + - name: PROTOCOLS_JSON + value: {{ .PROTOCOLS_JSON }} + restartPolicy: OnFailure \ No newline at end of file diff --git a/deploy/jobs/protocols-stats.yaml b/deploy/jobs/protocols-stats.yaml new file mode 100644 index 00000000..a883d163 --- /dev/null +++ b/deploy/jobs/protocols-stats.yaml @@ -0,0 +1,50 @@ +apiVersion: batch/v1 +kind: CronJob +metadata: + name: protocols-stats-1h + namespace: {{ .NAMESPACE }} +spec: #cronjob specs + schedule: {{ .PROTOCOLS_STATS_CRONTAB_SCHEDULE }} + jobTemplate: + spec: # job specs + template: + spec: # pod specs + containers: + - name: protocols-stats-1h + image: {{ .IMAGE_NAME }} + imagePullPolicy: Always + env: + - name: ENVIRONMENT + value: {{ .ENVIRONMENT }} + - name: LOG_LEVEL + value: {{ .LOG_LEVEL }} + - name: JOB_ID + value: JOB_PROTOCOLS_STATS + - name: INFLUX_URL + valueFrom: + configMapKeyRef: + name: config + key: influxdb-url + - name: INFLUX_TOKEN + valueFrom: + secretKeyRef: + name: influxdb + key: token + - name: INFLUX_ORGANIZATION + valueFrom: + configMapKeyRef: + name: config + key: influxdb-organization + - name: INFLUX_BUCKET_30_DAYS + valueFrom: + configMapKeyRef: + name: config + key: influxdb-bucket-30-days + - name: STATS_VERSION + valueFrom: + configMapKeyRef: + name: config + key: protocols-stats-version + - name: PROTOCOLS_JSON + value: {{ .PROTOCOLS_JSON }} + restartPolicy: OnFailure \ No newline at end of file diff --git a/jobs/cmd/main.go b/jobs/cmd/main.go index 877a256e..15bc6562 100644 --- a/jobs/cmd/main.go +++ b/jobs/cmd/main.go @@ -2,7 +2,13 @@ package main import ( "context" + "encoding/json" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/wormhole-foundation/wormhole-explorer/common/configuration" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/stats" "log" + "net/http" "os" "strings" "time" @@ -29,10 +35,10 @@ type exitCode int func main() { defer handleExit() - context := context.Background() + ctx := context.Background() // get the config - cfg, errConf := config.New(context) + cfg, errConf := config.New(ctx) if errConf != nil { log.Fatal("error creating config", errConf) } @@ -43,36 +49,45 @@ func main() { var err error switch cfg.JobID { case jobs.JobIDNotional: - nCfg, errCfg := config.NewNotionalConfiguration(context) + nCfg, errCfg := config.NewNotionalConfiguration(ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) } - notionalJob := initNotionalJob(context, nCfg, logger) + notionalJob := initNotionalJob(ctx, nCfg, logger) err = notionalJob.Run() + case jobs.JobIDTransferReport: - aCfg, errCfg := config.NewTransferReportConfiguration(context) + aCfg, errCfg := config.NewTransferReportConfiguration(ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) } - transferReport := initTransferReportJob(context, aCfg, logger) - err = transferReport.Run(context) + transferReport := initTransferReportJob(ctx, aCfg, logger) + err = transferReport.Run(ctx) + case jobs.JobIDHistoricalPrices: - hCfg, errCfg := config.NewHistoricalPricesConfiguration(context) + hCfg, errCfg := config.NewHistoricalPricesConfiguration(ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) } - historyPrices := initHistoricalPricesJob(context, hCfg, logger) - err = historyPrices.Run(context) + historyPrices := initHistoricalPricesJob(ctx, hCfg, logger) + err = historyPrices.Run(ctx) case jobs.JobIDMigrationSourceTx: - mCfg, errCfg := config.NewMigrateSourceTxConfiguration(context) + mCfg, errCfg := config.NewMigrateSourceTxConfiguration(ctx) if errCfg != nil { log.Fatal("error creating config", errCfg) } chainID := sdk.ChainID(mCfg.ChainID) - migrationJob := initMigrateSourceTxJob(context, mCfg, chainID, logger) - err = migrationJob.Run(context) + migrationJob := initMigrateSourceTxJob(ctx, mCfg, chainID, logger) + err = migrationJob.Run(ctx) + + case jobs.JobIDProtocolsStats: + statsJob := initProtocolStatsJob(ctx, logger) + err = statsJob.Run(ctx) + case jobs.JobIDProtocolsActivity: + activityJob := initProtocolActivityJob(ctx, logger) + err = activityJob.Run(ctx) default: logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID)) } @@ -156,6 +171,52 @@ func initMigrateSourceTxJob(ctx context.Context, cfg *config.MigrateSourceTxConf return migration.NewMigrationSourceChainTx(db.Database, cfg.PageSize, sdk.ChainID(cfg.ChainID), fromDate, toDate, txTrackerAPIClient, sleepTime, logger) } +func initProtocolStatsJob(ctx context.Context, logger *zap.Logger) *stats.ProtocolsStatsJob { + cfgJob, errCfg := configuration.LoadFromEnv[config.ProtocolsStatsConfiguration](ctx) + if errCfg != nil { + log.Fatal("error creating config", errCfg) + } + errUnmarshal := json.Unmarshal([]byte(cfgJob.ProtocolsJson), &cfgJob.Protocols) + if errUnmarshal != nil { + log.Fatal("error unmarshalling protocols config", errUnmarshal) + } + dbClient := influxdb2.NewClient(cfgJob.InfluxUrl, cfgJob.InfluxToken) + dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucket30Days) + statsFetchers := make([]stats.ClientStats, 0, len(cfgJob.Protocols)) + for _, c := range cfgJob.Protocols { + cs := stats.NewHttpRestClientStats(c.Name, + c.Url, + logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url)), + &http.Client{}, + ) + statsFetchers = append(statsFetchers, cs) + } + return stats.NewProtocolsStatsJob(dbWriter, logger, cfgJob.StatsVersion, statsFetchers...) +} + +func initProtocolActivityJob(ctx context.Context, logger *zap.Logger) *activity.ProtocolsActivityJob { + cfgJob, errCfg := configuration.LoadFromEnv[config.ProtocolsStatsConfiguration](ctx) + if errCfg != nil { + log.Fatal("error creating config", errCfg) + } + errUnmarshal := json.Unmarshal([]byte(cfgJob.ProtocolsJson), &cfgJob.Protocols) + if errUnmarshal != nil { + log.Fatal("error unmarshalling protocols config", errUnmarshal) + } + dbClient := influxdb2.NewClient(cfgJob.InfluxUrl, cfgJob.InfluxToken) + dbWriter := dbClient.WriteAPIBlocking(cfgJob.InfluxOrganization, cfgJob.InfluxBucket30Days) + activityFetchers := make([]activity.ClientActivity, 0, len(cfgJob.Protocols)) + for _, c := range cfgJob.Protocols { + builder, ok := activity.ActivitiesClientsFactory[c.Name] + if !ok { + log.Fatal("error creating protocol activity fetcher. Unknown protocol:", c.Name, errCfg) + } + cs := builder(c.Name, c.Url, logger.With(zap.String("protocol", c.Name), zap.String("url", c.Url))) + activityFetchers = append(activityFetchers, cs) + } + return activity.NewProtocolActivityJob(dbWriter, logger, cfgJob.ActivityVersion, activityFetchers...) +} + func handleExit() { if r := recover(); r != nil { if e, ok := r.(exitCode); ok { diff --git a/jobs/config/config.go b/jobs/config/config.go index 5904786c..da9832fd 100644 --- a/jobs/config/config.go +++ b/jobs/config/config.go @@ -57,15 +57,33 @@ type MigrateSourceTxConfiguration struct { SleepTimeSeconds int64 `env:"SLEEP_TIME_SECONDS,default=5"` } +type ProtocolsStatsConfiguration struct { + InfluxUrl string `env:"INFLUX_URL"` + InfluxToken string `env:"INFLUX_TOKEN"` + InfluxOrganization string `env:"INFLUX_ORGANIZATION"` + InfluxBucket30Days string `env:"INFLUX_BUCKET_30_DAYS"` + StatsVersion string `env:"STATS_VERSION"` + ActivityVersion string `env:"ACTIVITY_VERSION"` + ProtocolsJson string `env:"PROTOCOLS_JSON"` + Protocols []Protocol `json:"PROTOCOLS"` +} + +type Protocol struct { + Name string `json:"name"` + Url string `json:"url"` +} + +type ProtocolsActivityConfiguration struct { + ProtocolsStatsConfiguration +} + // New creates a default configuration with the values from .env file and environment variables. func New(ctx context.Context) (*Configuration, error) { _ = godotenv.Load(".env", "../.env") - var configuration Configuration if err := envconfig.Process(ctx, &configuration); err != nil { return nil, err } - return &configuration, nil } diff --git a/jobs/go.mod b/jobs/go.mod index ddeebeb0..b4d26bca 100644 --- a/jobs/go.mod +++ b/jobs/go.mod @@ -5,43 +5,55 @@ go 1.19 require ( github.com/go-redis/redis v6.15.9+incompatible github.com/go-resty/resty/v2 v2.10.0 + github.com/google/uuid v1.3.0 + github.com/influxdata/influxdb-client-go/v2 v2.12.2 github.com/joho/godotenv v1.5.1 + github.com/pkg/errors v0.9.1 github.com/sethvargo/go-envconfig v1.0.0 github.com/shopspring/decimal v1.3.1 + github.com/stretchr/testify v1.8.4 + github.com/test-go/testify v1.1.4 github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230713181709-0425a89e7533 github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229 go.mongodb.org/mongo-driver v1.11.2 - go.uber.org/zap v1.24.0 + go.uber.org/zap v1.25.0 ) require ( github.com/algorand/go-algorand-sdk v1.23.0 // indirect github.com/algorand/go-codec/codec v1.1.8 // indirect - github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect - github.com/cespare/xxhash/v2 v2.1.2 // indirect + github.com/benbjohnson/clock v1.3.5 // indirect + github.com/btcsuite/btcd/btcec/v2 v2.3.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/cosmos/btcutil v1.0.5 // indirect - github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/deepmap/oapi-codegen v1.8.2 // indirect github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect - github.com/ethereum/go-ethereum v1.10.21 // indirect + github.com/ethereum/go-ethereum v1.11.3 // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect github.com/golang/snappy v0.0.4 // indirect github.com/holiman/uint256 v1.2.1 // indirect - github.com/klauspost/compress v1.16.3 // indirect - github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect + github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect + github.com/klauspost/compress v1.16.7 // indirect + github.com/montanaflynn/stats v0.7.0 // indirect github.com/mr-tron/base58 v1.2.0 // indirect - github.com/onsi/gomega v1.27.6 // indirect - github.com/pkg/errors v0.9.1 // indirect + github.com/onsi/gomega v1.27.8 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect + github.com/stretchr/objx v0.5.0 // indirect + github.com/tidwall/pretty v1.2.1 // indirect github.com/xdg-go/pbkdf2 v1.0.0 // indirect - github.com/xdg-go/scram v1.1.1 // indirect - github.com/xdg-go/stringprep v1.0.3 // indirect - github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect - go.uber.org/atomic v1.7.0 // indirect - go.uber.org/multierr v1.6.0 // indirect + github.com/xdg-go/scram v1.1.2 // indirect + github.com/xdg-go/stringprep v1.0.4 // indirect + github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a // indirect + go.uber.org/multierr v1.11.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/net v0.17.0 // indirect - golang.org/x/sync v0.1.0 // indirect + golang.org/x/sync v0.3.0 // indirect golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.13.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) replace github.com/wormhole-foundation/wormhole-explorer/common => ../common diff --git a/jobs/go.sum b/jobs/go.sum index 509822ce..b95b81df 100644 --- a/jobs/go.sum +++ b/jobs/go.sum @@ -3,26 +3,35 @@ github.com/algorand/go-algorand-sdk v1.23.0/go.mod h1:7i2peZBcE48kfoxNZnLA+mklKh github.com/algorand/go-codec v1.1.8/go.mod h1:XhzVs6VVyWMLu6cApb9/192gBjGRVGm5cX5j203Heg4= github.com/algorand/go-codec/codec v1.1.8 h1:lsFuhcOH2LiEhpBH3BVUUkdevVmwCRyvb7FCAAPeY6U= github.com/algorand/go-codec/codec v1.1.8/go.mod h1:tQ3zAJ6ijTps6V+wp8KsGDnPC2uhHVC7ANyrtkIY0bA= -github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= -github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= -github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= +github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o= +github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= +github.com/btcsuite/btcd/btcec/v2 v2.3.2 h1:5n0X6hX0Zk+6omWcihdYvdAlGf2DfasC0GMf7DClJ3U= +github.com/btcsuite/btcd/btcec/v2 v2.3.2/go.mod h1:zYzJ8etWJQIv1Ogk7OzpWjowwOdXY1W/17j2MW85J04= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= -github.com/cespare/xxhash/v2 v2.1.2 h1:YRXhKfTDauu4ajMg1TPgFO5jnlC2HCbmLXMcTG5cbYE= -github.com/cespare/xxhash/v2 v2.1.2/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cosmos/btcutil v1.0.5 h1:t+ZFcX77LpKtDBhjucvnOH8C2l2ioGsBNEQ3jef8xFk= github.com/cosmos/btcutil v1.0.5/go.mod h1:IyB7iuqZMJlthe2tkIFL33xPyzbFYP0XVdS8P5lUPis= +github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0= -github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= -github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= +github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5ilcvdfma9wOH6Y= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= +github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbzjuhfU= +github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw= +github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= -github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3ol3teJY= -github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= +github.com/ethereum/go-ethereum v1.11.3 h1:uuBkYUJW9aY5JYi3+sqLHz+XWyo5fmn/ab9XcbtVDTU= +github.com/ethereum/go-ethereum v1.11.3/go.mod h1:rBUvAl5cdVrAei9q5lgOU7RSEuPJk1nlBDnS/YSoKQE= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4= +github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= +github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs= +github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg= +github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk= github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= @@ -32,65 +41,108 @@ github.com/go-resty/resty/v2 v2.10.0/go.mod h1:iiP/OpA0CkcL3IGt1O0+/SIItFUbkkyw5 github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So= +github.com/holiman/big v0.0.0-20221017200358-a027dc42d04e h1:pIYdhNkDh+YENVNi3gto8n9hAmRxKxoar0iE6BLucjw= github.com/holiman/uint256 v1.2.1 h1:XRtyuda/zw2l+Bq/38n5XUoEF72aSOu/77Thd9pPp2o= github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= +github.com/influxdata/influxdb-client-go/v2 v2.12.2 h1:uYABKdrEKlYm+++qfKdbgaHKBPmoWR5wpbmj6MBB/2g= +github.com/influxdata/influxdb-client-go/v2 v2.12.2/go.mod h1:YteV91FiQxRdccyJ2cHvj2f/5sq4y4Njqu1fQzsQCOU= +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM= +github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= -github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= -github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= -github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/klauspost/compress v1.16.7 h1:2mk3MPGNzKyxErAw8YaohYh69+pa4sIQSC0fPGCFR9I= +github.com/klauspost/compress v1.16.7/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= -github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= +github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg= +github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k= +github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= +github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ= +github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE= +github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc= +github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s= +github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ= +github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= +github.com/montanaflynn/stats v0.7.0 h1:r3y12KyNxj/Sb/iOE46ws+3mS1+MZca1wlHQFPsY/JU= +github.com/montanaflynn/stats v0.7.0/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= -github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= -github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= +github.com/onsi/gomega v1.27.8 h1:gegWiwZjBsf2DgiSbf5hpokZ98JVDMcWkUiigk6/KXc= +github.com/onsi/gomega v1.27.8/go.mod h1:2J8vzI/s+2shY9XHRApDkdgPo1TKT7P2u6fXeJKFnNQ= +github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/sethvargo/go-envconfig v1.0.0 h1:1C66wzy4QrROf5ew4KdVw942CQDa55qmlYmw9FZxZdU= github.com/sethvargo/go-envconfig v1.0.0/go.mod h1:Lzc75ghUn5ucmcRGIdGQ33DKJrcjk4kihFYgSTBmjIc= github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5gKV8= github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= +github.com/stretchr/objx v0.5.0 h1:1zr/of2m5FGMsad5YfcqgdqdWrIhu+EBEJRhR1U7z/c= +github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= +github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= +github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= +github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= +github.com/test-go/testify v1.1.4/go.mod h1:rH7cfJo/47vWGdi4GPj16x3/t1xGOj2YxzmNQzk2ghU= github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= -github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs= +github.com/tidwall/pretty v1.2.1 h1:qjsOFOWWQl+N3RsoF5/ssm1pHmJJwhjlSbZ51I6wMl4= +github.com/tidwall/pretty v1.2.1/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU= +github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc= +github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= +github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229 h1:fqcC4qwEVaJfcpqUVKi5+imz+JpxviQYPW4qu3zILz4= github.com/wormhole-foundation/wormhole/sdk v0.0.0-20240109172745-cc0cd9fc5229/go.mod h1:pE/jYet19kY4P3V6mE2+01zvEfxdyBqv6L6HsnSa5uc= github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= -github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= -github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= +github.com/xdg-go/scram v1.1.2 h1:FHX5I5B4i4hKRVRBCFRxq1iQRej7WO3hhBuJf+UUySY= +github.com/xdg-go/scram v1.1.2/go.mod h1:RT/sEzTbU5y00aCK8UOx6R7YryM0iF1N2MOmC3kKLN4= github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= -github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/xdg-go/stringprep v1.0.4 h1:XLI/Ng3O1Atzq0oBs3TWm+5ZVgkq2aqdlvP9JtoZ6c8= +github.com/xdg-go/stringprep v1.0.4/go.mod h1:mPGuuIYwz7CmR2bT9j4GbQqutWS1zV24gijq1dTyGkM= github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= +github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a h1:fZHgsYlfvtyqToslyjUt3VOPF4J7aK/3MPcK7xp3PDk= +github.com/youmark/pkcs8 v0.0.0-20201027041543-1326539a0a0a/go.mod h1:ul22v+Nro/R083muKhosV54bj5niojjWZvU8xrevuH4= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= github.com/yuin/goldmark v1.4.13/go.mod h1:6yULJ656Px+3vBD8DxQVa3kxgyrAnzto9xy5taEt/CY= go.mongodb.org/mongo-driver v1.11.2 h1:+1v2rDQUWNcGW7/7E0Jvdz51V38XXxJfhzbV17aNHCw= go.mongodb.org/mongo-driver v1.11.2/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= -go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= -go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= -go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= -go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= -go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= -go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= -go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/goleak v1.2.0 h1:xqgm/S+aQvhWFTtR0XK3Jvg7z8kGV8P4X14IzwN3Eqk= +go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= +go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= +go.uber.org/zap v1.25.0 h1:4Hvk6GtkucQ790dqmj7l1eEnRdKm3k3ZUrUMS2d5+5c= +go.uber.org/zap v1.25.0/go.mod h1:JIAUzQIH94IC4fOJQm7gMmBJP5k7wQfdcnYdPoEXJYk= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20200302210943-78000ba7a073/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.14.0 h1:wBqGXzWJW6m1XrIKlAH0Hs1JJ7+9KBwnIO8v66Q9cHc= @@ -98,7 +150,10 @@ golang.org/x/crypto v0.14.0/go.mod h1:MVFd36DqK4CsrnJYDkBA3VC4m2GkXAM0PvzMCn4JQf golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/mod v0.6.0-dev.0.20220419223038-86c51ed26bb4/go.mod h1:jJ57K6gSWd91VN4djpZkiMVwK6gcyfeH4XE8wZrZaV4= golang.org/x/mod v0.8.0/go.mod h1:iBbtSCu2XBx23ZKBPSOrRkjjQPZFPuis4dIYUhu/chs= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA= +golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= @@ -110,10 +165,20 @@ golang.org/x/net v0.17.0/go.mod h1:NxSsAGuq816PNPmqtQdLE42eU2Fs7NoRIZrHJAlaCOE= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= +golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -123,6 +188,7 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/term v0.5.0/go.mod h1:jMB1sMXY+tzblOD4FWmEbocvup2/aLOaQEp7JmGp78k= @@ -130,16 +196,21 @@ golang.org/x/term v0.8.0/go.mod h1:xPskH00ivmX89bAKVGSKKtLOWNx2+17Eiy94tnKShWo= golang.org/x/term v0.13.0/go.mod h1:LTmsnFJwVN6bCy1rVCoS+qHT1HhALEFxKncY3WNNh4U= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= +golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= +golang.org/x/text v0.3.8/go.mod h1:E6s5w1FMmriuDzIBO73fBruAKo1PCIq6d2Q6DHfQ8WQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= +golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/time v0.3.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= +golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= golang.org/x/tools v0.6.0/go.mod h1:Xwgl3UAJ/d3gWutnCtw505GrjyAbvKui8lOU390QaIU= @@ -149,7 +220,10 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/jobs/jobs/jobs.go b/jobs/jobs/jobs.go index 7785104b..a72a8564 100644 --- a/jobs/jobs/jobs.go +++ b/jobs/jobs/jobs.go @@ -1,15 +1,19 @@ // Package jobs define an interface to execute jobs package jobs +import "context" + // JobIDNotional is the job id for notional job. const ( JobIDNotional = "JOB_NOTIONAL_USD" JobIDTransferReport = "JOB_TRANSFER_REPORT" JobIDHistoricalPrices = "JOB_HISTORICAL_PRICES" JobIDMigrationSourceTx = "JOB_MIGRATE_SOURCE_TX" + JobIDProtocolsStats = "JOB_PROTOCOLS_STATS" + JobIDProtocolsActivity = "JOB_PROTOCOLS_ACTIVITY" ) // Job is the interface for jobs. type Job interface { - Run() error + Run(ctx context.Context) error } diff --git a/jobs/jobs/protocols/activity/activity.go b/jobs/jobs/protocols/activity/activity.go new file mode 100644 index 00000000..bfc54c08 --- /dev/null +++ b/jobs/jobs/protocols/activity/activity.go @@ -0,0 +1,82 @@ +package activity + +import ( + "context" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" + "go.uber.org/zap" + "strconv" + "sync" + "time" +) + +// NewProtocolActivityJob creates an instance of the job implementation. +func NewProtocolActivityJob(statsDB api.WriteAPIBlocking, logger *zap.Logger, version string, activityFetchers ...ClientActivity) *ProtocolsActivityJob { + return &ProtocolsActivityJob{ + statsDB: statsDB, + logger: logger.With(zap.String("module", "ProtocolsActivityJob")), + activityFetchers: activityFetchers, + version: version, + } +} + +func (m *ProtocolsActivityJob) Run(ctx context.Context) error { + + clientsQty := len(m.activityFetchers) + wg := sync.WaitGroup{} + wg.Add(clientsQty) + errs := make(chan error, clientsQty) + ts := time.Now().UTC().Truncate(time.Hour) // make minutes and seconds zero, so we only work with date and hour + from := ts.Add(-1 * time.Hour) + m.logger.Info("running protocols activity job ", zap.Time("from", from), zap.Time("to", ts)) + for _, cs := range m.activityFetchers { + go func(c ClientActivity) { + defer wg.Done() + activity, err := c.Get(ctx, from, ts) + if err != nil { + errs <- err + return + } + errs <- m.updateActivity(ctx, c.ProtocolName(), m.version, activity, from) + }(cs) + } + + wg.Wait() + close(errs) + for err := range errs { + if err != nil { + return err + } + } + + return nil +} + +func (m *ProtocolsActivityJob) updateActivity(ctx context.Context, protocol, version string, activity repositories.ProtocolActivity, ts time.Time) error { + + points := make([]*write.Point, 0, len(activity.Activities)) + + for i := range activity.Activities { + point := influxdb2. + NewPointWithMeasurement(dbconsts.ProtocolsActivityMeasurement). + AddTag("protocol", protocol). + AddTag("emitter_chain_id", strconv.FormatUint(activity.Activities[i].EmitterChainID, 10)). + AddTag("destination_chain_id", strconv.FormatUint(activity.Activities[i].DestinationChainID, 10)). + AddTag("version", version). + AddField("total_value_secure", activity.TotalValueSecure). + AddField("total_value_transferred", activity.TotalValueTransferred). + AddField("txs", activity.Activities[i].Txs). + AddField("total_usd", activity.Activities[i].TotalUSD). + SetTime(ts) + points = append(points, point) + } + + err := m.statsDB.WritePoint(ctx, points...) + if err != nil { + m.logger.Error("failed updating protocol Activities in influxdb", zap.Error(err), zap.String("protocol", protocol)) + } + return err +} diff --git a/jobs/jobs/protocols/activity/activity_test.go b/jobs/jobs/protocols/activity/activity_test.go new file mode 100644 index 00000000..b9426194 --- /dev/null +++ b/jobs/jobs/protocols/activity/activity_test.go @@ -0,0 +1,80 @@ +package activity_test + +import ( + "context" + "errors" + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/mock" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "go.uber.org/zap" + "testing" + "time" +) + +func Test_ProtocolsActivityJob_Succeed(t *testing.T) { + var mockErr error + activityFetcher := &mockActivityFetch{} + act := repositories.ProtocolActivity{ + Activities: []repositories.Activity{ + { + EmitterChainID: 1, + DestinationChainID: 2, + Txs: 150, + TotalUSD: 250000, + }, + }, + } + + activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(act, mockErr) + activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") + mockWriterDB := &mocks.MockWriterApi{} + mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) + + job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) + resultErr := job.Run(context.Background()) + assert.Nil(t, resultErr) +} + +func Test_ProtocolsActivityJob_FailFetching(t *testing.T) { + var mockErr error + activityFetcher := &mockActivityFetch{} + activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(repositories.ProtocolActivity{}, errors.New("mocked_error_fetch")) + activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") + mockWriterDB := &mocks.MockWriterApi{} + mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) + + job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) + resultErr := job.Run(context.Background()) + assert.NotNil(t, resultErr) + assert.Equal(t, "mocked_error_fetch", resultErr.Error()) +} + +func Test_ProtocolsActivityJob_FailedUpdatingDB(t *testing.T) { + var mockErr error + activityFetcher := &mockActivityFetch{} + activityFetcher.On("Get", mock.Anything, mock.Anything, mock.Anything).Return(repositories.ProtocolActivity{}, mockErr) + activityFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") + mockWriterDB := &mocks.MockWriterApi{} + mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(errors.New("mocked_error_update_db")) + + job := activity.NewProtocolActivityJob(mockWriterDB, zap.NewNop(), "v1", activityFetcher) + resultErr := job.Run(context.Background()) + assert.NotNil(t, resultErr) + assert.Equal(t, "mocked_error_update_db", resultErr.Error()) +} + +type mockActivityFetch struct { + mock.Mock +} + +func (m *mockActivityFetch) Get(ctx context.Context, from, to time.Time) (repositories.ProtocolActivity, error) { + args := m.Called(ctx, from, to) + return args.Get(0).(repositories.ProtocolActivity), args.Error(1) +} + +func (m *mockActivityFetch) ProtocolName() string { + args := m.Called() + return args.String(0) +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/allbridge.go b/jobs/jobs/protocols/activity/internal/repositories/allbridge.go new file mode 100644 index 00000000..2b9174e9 --- /dev/null +++ b/jobs/jobs/protocols/activity/internal/repositories/allbridge.go @@ -0,0 +1,141 @@ +package repositories + +import ( + "context" + "encoding/json" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "go.uber.org/zap" + "io" + "net/http" + "strconv" + "time" +) + +func NewAllBridgeRestClient(name, url string, logger *zap.Logger, httpClient commons.HttpDo) *AllBridgeRestClient { + return &AllBridgeRestClient{ + name: name, + url: url, + logger: logger, + client: httpClient, + } +} + +type AllBridgeRestClient struct { + name string + url string + client commons.HttpDo + logger *zap.Logger +} + +func (d *AllBridgeRestClient) ProtocolName() string { + return d.name +} + +func (d *AllBridgeRestClient) Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { + decoratedLogger := d.logger + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + q := req.URL.Query() + q.Set("from", from.Format(time.RFC3339)) + q.Set("to", to.Format(time.RFC3339)) + req.URL.RawQuery = q.Encode() + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", + zap.String("response_body", string(body)), + ) + return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. url:%s - status_code:%d", d.url, resp.StatusCode) + } + + var temp allBridgeActivity + err = json.Unmarshal(body, &temp) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + } + + return temp.toProtocolActivity() +} + +type allBridgeActivity struct { + TotalValueSecured string `json:"total_value_secure"` + TotalValueTransferred string `json:"total_value_transferred"` + Activities []struct { + EmitterChainID uint64 `json:"emitter_chain_id"` + DestinationChainID uint64 `json:"destination_chain_id"` + Txs string `json:"txs"` + TotalUSD string `json:"total_usd"` + } `json:"activity"` +} + +func (m *allBridgeActivity) toProtocolActivity() (ProtocolActivity, error) { + result := ProtocolActivity{} + + totalValueSecured, err := strconv.ParseFloat(m.TotalValueSecured, 64) + if err != nil { + return result, errors.Wrap(err, "failed parsing string TotalValueSecure to float64") + } + result.TotalValueSecure = totalValueSecured + + totalValueTransferred, err := strconv.ParseFloat(m.TotalValueTransferred, 64) + if err != nil { + return result, errors.Wrap(err, "failed parsing string TotalValueTransferred to float64") + } + result.TotalValueTransferred = totalValueTransferred + + for i := range m.Activities { + + act := m.Activities[i] + txs, errTxs := strconv.ParseUint(act.Txs, 10, 64) + if errTxs != nil { + return result, errors.Wrap(errTxs, "failed parsing string txs to uint64") + } + + totalUSD, errTotalUSD := strconv.ParseFloat(act.TotalUSD, 64) + if errTotalUSD != nil { + return result, errors.Wrap(errTxs, "failed parsing string total_usd to float64") + } + + a := Activity{ + EmitterChainID: m.Activities[i].EmitterChainID, + DestinationChainID: m.Activities[i].DestinationChainID, + Txs: txs, + TotalUSD: totalUSD, + } + result.Activities = append(result.Activities, a) + } + + return result, nil +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go b/jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go new file mode 100644 index 00000000..4f650805 --- /dev/null +++ b/jobs/jobs/protocols/activity/internal/repositories/allbridge_test.go @@ -0,0 +1,97 @@ +package repositories + +import ( + "bytes" + "context" + "errors" + "github.com/stretchr/testify/assert" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "go.uber.org/zap" + "io" + "net/http" + "testing" + "time" +) + +func Test_AllbridgeRestClientActivity_FailRequestCreation(t *testing.T) { + + a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return nil, nil + })) + _, err := a.Get(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error + assert.NotNil(t, err) +} + +func Test_AllbridgeRestClientActivity_FailedRequestExecution(t *testing.T) { + + a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("mocked_http_client_do") + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "mocked_http_client_do", err.Error()) +} + +func Test_AllbridgeRestClientActivity_Status500(t *testing.T) { + + a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(bytes.NewBufferString("response_body_test")), + }, nil + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "failed retrieving protocol Activities from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) +} + +func Test_AllbridgeRestClientActivity_Status200_FailedReadBody(t *testing.T) { + + a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: &mocks.MockFailReadCloser{}, + }, nil + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "failed reading response body from protocol Activities. url:localhost - status_code:200: mocked_fail_read", err.Error()) +} + +func Test_AllbridgeRestClientActivity_Status200_FailedParsing(t *testing.T) { + + a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString("this should be a json")), + }, nil + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "failed unmarshalling response body from protocol Activities. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) +} + +func Test_AllbridgeRestClientActivity_Status200_Succeed(t *testing.T) { + + a := NewAllBridgeRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString("{\"activity\":[{\"emitter_chain_id\":5,\"destination_chain_id\":1,\"txs\":\"1827\",\"total_usd\":\"445743.185719500000\"}],\"total_value_secure\":\"0\",\"total_value_transferred\":\"5734947.136079277\"}")), + }, nil + })) + resp, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.Nil(t, err) + assert.Equal(t, float64(0), resp.TotalValueSecure) + assert.Equal(t, 5734947.136079277, resp.TotalValueTransferred) + assert.Equal(t, 1, len(resp.Activities)) + assert.Equal(t, uint64(5), resp.Activities[0].EmitterChainID) + assert.Equal(t, uint64(1), resp.Activities[0].DestinationChainID) + assert.Equal(t, uint64(1827), resp.Activities[0].Txs) + assert.Equal(t, 445743.185719500000, resp.Activities[0].TotalUSD) +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/mayan.go b/jobs/jobs/protocols/activity/internal/repositories/mayan.go new file mode 100644 index 00000000..74f8032a --- /dev/null +++ b/jobs/jobs/protocols/activity/internal/repositories/mayan.go @@ -0,0 +1,128 @@ +package repositories + +import ( + "context" + "encoding/json" + "github.com/google/uuid" + "github.com/pkg/errors" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons" + "go.uber.org/zap" + "io" + "net/http" + "strconv" + "time" +) + +func NewMayanRestClient(name, url string, logger *zap.Logger, httpClient commons.HttpDo) *MayanRestClient { + return &MayanRestClient{ + name: name, + url: url, + logger: logger, + client: httpClient, + } +} + +type MayanRestClient struct { + name string + url string + client commons.HttpDo + logger *zap.Logger +} + +func (d *MayanRestClient) ProtocolName() string { + return d.name +} + +func (d *MayanRestClient) Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) { + decoratedLogger := d.logger + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + q := req.URL.Query() + q.Set("from", from.Format(time.RFC3339)) + q.Set("to", to.Format(time.RFC3339)) + req.URL.RawQuery = q.Encode() + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving protocol Activities", + zap.Error(err), + ) + return ProtocolActivity{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", commons.ToJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving protocol Activities: got an invalid response status code", + zap.String("response_body", string(body)), + ) + return ProtocolActivity{}, errors.Errorf("failed retrieving protocol Activities from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from protocol Activities. url:%s - status_code:%d", d.url, resp.StatusCode) + } + + type mayanActivity struct { + ProtocolActivity + Activities []struct { + AlternativeEmitterChainID string `json:"emmiter_chain_id"` // typo is on purpose due to mayan-api returning in that format + DestinationChainID string `json:"destination_chain_id"` + Txs uint64 `json:"txs"` + TotalUSD float64 `json:"total_usd"` + } `json:"activity"` + } + + var mayanResp mayanActivity + err = json.Unmarshal(body, &mayanResp) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return ProtocolActivity{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from protocol Activities. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + } + + result := ProtocolActivity{ + TotalValueTransferred: mayanResp.TotalValueTransferred, + TotalValueSecure: mayanResp.TotalValueSecure, + TotalMessages: mayanResp.TotalMessages, + Volume: mayanResp.Volume, + } + + for _, act := range mayanResp.Activities { + + emitterChainId, errEmitter := strconv.ParseUint(act.AlternativeEmitterChainID, 10, 64) + if errEmitter != nil { + return ProtocolActivity{}, errors.Wrap(errEmitter, "failed parsing protocol activity emitter chain id from string to uint64") + } + + destChainId, errDest := strconv.ParseUint(act.DestinationChainID, 10, 64) + if errDest != nil { + return ProtocolActivity{}, errors.Wrap(errDest, "failed parsing protocol activity destination chain id from string to uint64") + } + + val := Activity{ + EmitterChainID: emitterChainId, + DestinationChainID: destChainId, + Txs: act.Txs, + TotalUSD: act.TotalUSD, + } + result.Activities = append(result.Activities, val) + } + + return result, nil +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/mayan_test.go b/jobs/jobs/protocols/activity/internal/repositories/mayan_test.go new file mode 100644 index 00000000..2c4be316 --- /dev/null +++ b/jobs/jobs/protocols/activity/internal/repositories/mayan_test.go @@ -0,0 +1,99 @@ +package repositories + +import ( + "bytes" + "context" + "errors" + "github.com/stretchr/testify/assert" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "go.uber.org/zap" + "io" + "net/http" + "testing" + "time" +) + +func Test_HttpRestClientActivity_FailRequestCreation(t *testing.T) { + + a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return nil, nil + })) + _, err := a.Get(nil, time.Now(), time.Now()) // passing ctx nil to force request creation error + assert.NotNil(t, err) +} + +func Test_HttpRestClientActivity_FailedRequestExecution(t *testing.T) { + + a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("mocked_http_client_do") + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "mocked_http_client_do", err.Error()) +} + +func Test_HttpRestClientActivity_Status500(t *testing.T) { + + a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(bytes.NewBufferString("response_body_test")), + }, nil + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "failed retrieving protocol Activities from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) +} + +func Test_HttpRestClientActivity_Status200_FailedReadBody(t *testing.T) { + + a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: &mocks.MockFailReadCloser{}, + }, nil + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "failed reading response body from protocol Activities. url:localhost - status_code:200: mocked_fail_read", err.Error()) +} + +func Test_HttpRestClientActivity_Status200_FailedParsing(t *testing.T) { + + a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString("this should be a json")), + }, nil + })) + _, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.NotNil(t, err) + assert.Equal(t, "failed unmarshalling response body from protocol Activities. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) +} + +func Test_HttpRestClientActivity_Status200_Succeed(t *testing.T) { + + a := NewMayanRestClient("protocol_test", "localhost", zap.NewNop(), + mocks.MockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString("{\"total_value_secure\":1640898.7106282723,\"total_value_transferred\":2600395.040031102,\"total_messages\":2225,\"activity\":[{\"emmiter_chain_id\":\"1\",\"destination_chain_id\":\"2\",\"txs\":88,\"total_usd\":648500.9762709612}],\"volume\":2761848.9678057004}")), + }, nil + })) + resp, err := a.Get(context.Background(), time.Now(), time.Now()) + assert.Nil(t, err) + assert.Equal(t, 1640898.7106282723, resp.TotalValueSecure) + assert.Equal(t, 2600395.040031102, resp.TotalValueTransferred) + assert.Equal(t, uint64(2225), resp.TotalMessages) + assert.Equal(t, 2761848.9678057004, resp.Volume) + assert.Equal(t, 1, len(resp.Activities)) + assert.Equal(t, uint64(1), resp.Activities[0].EmitterChainID) + assert.Equal(t, uint64(2), resp.Activities[0].DestinationChainID) + assert.Equal(t, uint64(88), resp.Activities[0].Txs) + assert.Equal(t, 648500.9762709612, resp.Activities[0].TotalUSD) +} diff --git a/jobs/jobs/protocols/activity/internal/repositories/repository.go b/jobs/jobs/protocols/activity/internal/repositories/repository.go new file mode 100644 index 00000000..d96e2024 --- /dev/null +++ b/jobs/jobs/protocols/activity/internal/repositories/repository.go @@ -0,0 +1,26 @@ +package repositories + +import ( + "context" + "time" +) + +type ProtocolActivityRepository interface { + Get(ctx context.Context, from, to time.Time) (ProtocolActivity, error) + ProtocolName() string +} + +type ProtocolActivity struct { + TotalValueSecure float64 `json:"total_value_secure"` + TotalValueTransferred float64 `json:"total_value_transferred"` + Volume float64 `json:"volume"` + TotalMessages uint64 `json:"total_messages"` + Activities []Activity `json:"activity"` +} + +type Activity struct { + EmitterChainID uint64 `json:"emitter_chain_id"` + DestinationChainID uint64 `json:"destination_chain_id"` + Txs uint64 `json:"txs"` + TotalUSD float64 `json:"total_usd"` +} diff --git a/jobs/jobs/protocols/activity/types.go b/jobs/jobs/protocols/activity/types.go new file mode 100644 index 00000000..9cafa6a5 --- /dev/null +++ b/jobs/jobs/protocols/activity/types.go @@ -0,0 +1,41 @@ +package activity + +import ( + "context" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/activity/internal/repositories" + "go.uber.org/zap" + "net/http" + "time" +) + +// Protocols +const ( + MayanProtocol = "mayan" + AllBridgeProtocol = "allbridge" +) + +type ProtocolsActivityJob struct { + statsDB api.WriteAPIBlocking + logger *zap.Logger + activityFetchers []ClientActivity + version string +} + +// ClientActivity Abstraction for fetching protocol Activity since each client may have different implementation details. +type ClientActivity interface { + Get(ctx context.Context, from, to time.Time) (repositories.ProtocolActivity, error) + ProtocolName() string +} + +// ActivitiesClientsFactory RestClient Factory to create the right client for each protocol. +var ActivitiesClientsFactory = map[string]func(name, url string, logger *zap.Logger) ClientActivity{ + + MayanProtocol: func(name, url string, logger *zap.Logger) ClientActivity { + return repositories.NewMayanRestClient(name, url, logger, &http.Client{}) + }, + + AllBridgeProtocol: func(name, url string, logger *zap.Logger) ClientActivity { + return repositories.NewAllBridgeRestClient(name, url, logger, &http.Client{}) + }, +} diff --git a/jobs/jobs/protocols/internal/commons/common.go b/jobs/jobs/protocols/internal/commons/common.go new file mode 100644 index 00000000..e8178a6b --- /dev/null +++ b/jobs/jobs/protocols/internal/commons/common.go @@ -0,0 +1,15 @@ +package commons + +import ( + "encoding/json" + "net/http" +) + +func ToJson(headers http.Header) string { + bytes, _ := json.Marshal(headers) + return string(bytes) +} + +type HttpDo interface { + Do(req *http.Request) (*http.Response, error) +} diff --git a/jobs/jobs/protocols/internal/commons/mocks/mocks.go b/jobs/jobs/protocols/internal/commons/mocks/mocks.go new file mode 100644 index 00000000..0dc27222 --- /dev/null +++ b/jobs/jobs/protocols/internal/commons/mocks/mocks.go @@ -0,0 +1,49 @@ +package mocks + +import ( + "context" + "errors" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/test-go/testify/mock" + "net/http" +) + +// MockWriterApi mock influxdb WriterApiBlocking interface +type MockWriterApi struct { + mock.Mock +} + +func (m *MockWriterApi) WriteRecord(ctx context.Context, line ...string) error { + args := m.Called(ctx, line) + return args.Error(0) +} + +func (m *MockWriterApi) WritePoint(ctx context.Context, point ...*write.Point) error { + args := m.Called(ctx, point) + return args.Error(0) +} + +func (m *MockWriterApi) EnableBatching() { +} + +func (m *MockWriterApi) Flush(ctx context.Context) error { + args := m.Called(ctx) + return args.Error(0) +} + +type MockHttpClient func(req *http.Request) (*http.Response, error) + +func (m MockHttpClient) Do(req *http.Request) (*http.Response, error) { + return m(req) +} + +type MockFailReadCloser struct { +} + +func (m *MockFailReadCloser) Read(_ []byte) (n int, err error) { + return 0, errors.New("mocked_fail_read") +} + +func (m *MockFailReadCloser) Close() error { + return nil +} diff --git a/jobs/jobs/protocols/stats/stats.go b/jobs/jobs/protocols/stats/stats.go new file mode 100644 index 00000000..d29e1a46 --- /dev/null +++ b/jobs/jobs/protocols/stats/stats.go @@ -0,0 +1,227 @@ +package stats + +import ( + "context" + "encoding/json" + "github.com/google/uuid" + influxdb2 "github.com/influxdata/influxdb-client-go/v2" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/write" + "github.com/pkg/errors" + "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" + "go.uber.org/zap" + "io" + "net/http" + "strconv" + "sync" + "time" +) + +type ProtocolsStatsJob struct { + statsDB api.WriteAPIBlocking + logger *zap.Logger + statsClientsFetchers []ClientStats + version string +} + +// ClientStats Abstraction for fetching stats since each protocol may have different implementation details. +type ClientStats interface { + Get(ctx context.Context) (Stats, error) + ProtocolName() string +} + +type protocolStats struct { + Stats + Name string +} + +type Stats struct { + TotalValueLocked float64 + TotalMessages uint64 +} + +// NewProtocolsStatsJob creates an instance of the job implementation. +func NewProtocolsStatsJob(statsDB api.WriteAPIBlocking, logger *zap.Logger, version string, statsFetchers ...ClientStats) *ProtocolsStatsJob { + return &ProtocolsStatsJob{ + statsDB: statsDB, + logger: logger.With(zap.String("module", "ProtocolsStatsJob")), + statsClientsFetchers: statsFetchers, + version: version, + } +} + +func (s *ProtocolsStatsJob) Run(ctx context.Context) error { + + clientsQty := len(s.statsClientsFetchers) + wg := sync.WaitGroup{} + wg.Add(clientsQty) + stats := make(chan protocolStats, clientsQty) + var anyError error + + for _, cs := range s.statsClientsFetchers { + go func(c ClientStats) { + defer wg.Done() + st, err := c.Get(ctx) + if err != nil { + anyError = err + return + } + stats <- protocolStats{st, c.ProtocolName()} + }(cs) + } + + wg.Wait() + close(stats) + + err := s.updateStats(ctx, stats) + if err != nil { + anyError = err + } + + return anyError +} + +func (s *ProtocolsStatsJob) updateStats(ctx context.Context, stats <-chan protocolStats) error { + + ts := time.Now().UTC().Truncate(time.Hour) // make minutes and seconds zero, so we only work with date and hour + points := make([]*write.Point, 0, len(stats)) + + for st := range stats { + point := influxdb2. + NewPointWithMeasurement(dbconsts.ProtocolsStatsMeasurement). + AddTag("protocol", st.Name). + AddTag("version", s.version). + AddField("total_messages", st.TotalMessages). + AddField("total_value_locked", st.TotalValueLocked). + SetTime(ts) + + points = append(points, point) + } + + err := s.statsDB.WritePoint(ctx, points...) + if err != nil { + s.logger.Error("failed updating protocol stats in influxdb", zap.Error(err)) + } + return err +} + +// Default implementation of ClientStats interface. Encapsulate the url and http.client for calling a specific external service to retrieve stats +type httpRestClientStats struct { + name string + url string + client httpDo + logger *zap.Logger +} + +type httpDo interface { + Do(req *http.Request) (*http.Response, error) +} + +func NewHttpRestClientStats(name, url string, logger *zap.Logger, httpClient httpDo) ClientStats { + return &httpRestClientStats{ + name: name, + url: url, + logger: logger, + client: httpClient, + } +} + +func (d *httpRestClientStats) ProtocolName() string { + return d.name +} + +func (d *httpRestClientStats) Get(ctx context.Context) (Stats, error) { + + decoratedLogger := d.logger + + req, err := http.NewRequestWithContext(ctx, http.MethodGet, d.url, nil) + if err != nil { + decoratedLogger.Error("failed creating http request for retrieving client stats", + zap.Error(err), + ) + return Stats{}, errors.WithStack(err) + } + + reqId := uuid.New().String() + req.Header.Set("X-Request-ID", reqId) + decoratedLogger = decoratedLogger.With(zap.String("requestID", reqId)) + + resp, err := d.client.Do(req) + if err != nil { + decoratedLogger.Error("failed retrieving client stats", + zap.Error(err), + ) + return Stats{}, errors.WithStack(err) + } + defer resp.Body.Close() + + decoratedLogger = decoratedLogger. + With(zap.String("status_code", http.StatusText(resp.StatusCode))). + With(zap.String("response_headers", toJson(resp.Header))) + + if resp.StatusCode != http.StatusOK { + body, _ := io.ReadAll(resp.Body) + decoratedLogger.Error("error retrieving client stats: got an invalid response status code", + zap.String("response_body", string(body)), + ) + return Stats{}, errors.Errorf("failed retrieving client stats from url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + } + + body, err := io.ReadAll(resp.Body) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err)) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed reading response body from client stats. url:%s - status_code:%d", d.url, resp.StatusCode) + } + var stats Stats + err = json.Unmarshal(body, &stats) + if err != nil { + decoratedLogger.Error("failed reading response body", zap.Error(err), zap.String("response_body", string(body))) + return Stats{}, errors.Wrapf(errors.WithStack(err), "failed unmarshalling response body from client stats. url:%s - status_code:%d - response_body:%s", d.url, resp.StatusCode, string(body)) + } + + return stats, nil + +} + +func toJson(headers http.Header) string { + bytes, _ := json.Marshal(headers) + return string(bytes) +} + +func (rd *Stats) UnmarshalJSON(data []byte) error { + + temp := struct { + TotalValueLocked json.RawMessage `json:"total_value_locked"` + TotalMessages json.RawMessage `json:"total_messages"` + }{} + + if err := json.Unmarshal(data, &temp); err != nil { + return err + } + + if err := parseJSONNumber(temp.TotalValueLocked, &rd.TotalValueLocked); err != nil { + return err + } + + var totalMsg float64 + if err := parseJSONNumber(temp.TotalMessages, &totalMsg); err != nil { + return err + } + + rd.TotalMessages = uint64(totalMsg) + return nil +} + +// parseJSONNumber helps to support both string and numeric JSON values since different protocols return different types for the same fields. +func parseJSONNumber(raw json.RawMessage, dest *float64) error { + var strVal string + if err := json.Unmarshal(raw, &strVal); err == nil { + val, err1 := strconv.ParseFloat(strVal, 64) + if err1 != nil { + return err1 + } + *dest = val + return nil + } + return json.Unmarshal(raw, dest) +} diff --git a/jobs/jobs/protocols/stats/stats_test.go b/jobs/jobs/protocols/stats/stats_test.go new file mode 100644 index 00000000..e9c91c36 --- /dev/null +++ b/jobs/jobs/protocols/stats/stats_test.go @@ -0,0 +1,165 @@ +package stats_test + +import ( + "bytes" + "context" + "errors" + "github.com/stretchr/testify/assert" + "github.com/test-go/testify/mock" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/internal/commons/mocks" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/protocols/stats" + "go.uber.org/zap" + "io" + "net/http" + "testing" +) + +func Test_ProtocolsStatsJob_Succeed(t *testing.T) { + var mockErr error + statsFetcher := &mockStatsFetch{} + statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, mockErr) + statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") + mockWriterDB := &mocks.MockWriterApi{} + mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) + + job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) + resultErr := job.Run(context.Background()) + assert.Nil(t, resultErr) +} + +func Test_ProtocolsStatsJob_FailFetching(t *testing.T) { + var mockErr error + statsFetcher := &mockStatsFetch{} + statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, errors.New("mocked_error_fetch")) + statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") + mockWriterDB := &mocks.MockWriterApi{} + mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(mockErr) + + job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) + resultErr := job.Run(context.Background()) + assert.NotNil(t, resultErr) + assert.Equal(t, "mocked_error_fetch", resultErr.Error()) +} + +func Test_ProtocolsStatsJob_FailedUpdatingDB(t *testing.T) { + var mockErr error + statsFetcher := &mockStatsFetch{} + statsFetcher.On("Get", mock.Anything).Return(stats.Stats{}, mockErr) + statsFetcher.On("ProtocolName", mock.Anything).Return("protocol_test") + mockWriterDB := &mocks.MockWriterApi{} + mockWriterDB.On("WritePoint", mock.Anything, mock.Anything).Return(errors.New("mocked_error_update_db")) + + job := stats.NewProtocolsStatsJob(mockWriterDB, zap.NewNop(), "v1", statsFetcher) + resultErr := job.Run(context.Background()) + assert.NotNil(t, resultErr) + assert.Equal(t, "mocked_error_update_db", resultErr.Error()) +} + +func Test_HttpRestClientStats_FailRequestCreation(t *testing.T) { + + a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), + mockHttpClient(func(req *http.Request) (*http.Response, error) { + return nil, nil + })) + _, err := a.Get(nil) // passing ctx nil to force request creation error + assert.NotNil(t, err) +} + +func Test_HttpRestClientStats_FailedRequestExecution(t *testing.T) { + + a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), + mockHttpClient(func(req *http.Request) (*http.Response, error) { + return nil, errors.New("mocked_http_client_do") + })) + _, err := a.Get(context.Background()) + assert.NotNil(t, err) + assert.Equal(t, "mocked_http_client_do", err.Error()) +} + +func Test_HttpRestClientStats_Status500(t *testing.T) { + + a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), + mockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusInternalServerError, + Body: io.NopCloser(bytes.NewBufferString("response_body_test")), + }, nil + })) + _, err := a.Get(context.Background()) + assert.NotNil(t, err) + assert.Equal(t, "failed retrieving client stats from url:localhost - status_code:500 - response_body:response_body_test", err.Error()) +} + +func Test_HttpRestClientStats_Status200_FailedReadBody(t *testing.T) { + + a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), + mockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: &mockFailReadCloser{}, + }, nil + })) + _, err := a.Get(context.Background()) + assert.NotNil(t, err) + assert.Equal(t, "failed reading response body from client stats. url:localhost - status_code:200: mocked_fail_read", err.Error()) +} + +func Test_HttpRestClientStats_Status200_FailedParsing(t *testing.T) { + + a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), + mockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString("this should be a json")), + }, nil + })) + _, err := a.Get(context.Background()) + assert.NotNil(t, err) + assert.Equal(t, "failed unmarshalling response body from client stats. url:localhost - status_code:200 - response_body:this should be a json: invalid character 'h' in literal true (expecting 'r')", err.Error()) +} + +func Test_HttpRestClientStats_Status200_Succeed(t *testing.T) { + + a := stats.NewHttpRestClientStats("protocol_test", "localhost", zap.NewNop(), + mockHttpClient(func(req *http.Request) (*http.Response, error) { + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString("{\"total_value_locked\":\"123\",\"total_messages\":\"456\"}")), + }, nil + })) + resp, err := a.Get(context.Background()) + assert.Nil(t, err) + assert.Equal(t, float64(123), resp.TotalValueLocked) + assert.Equal(t, uint64(456), resp.TotalMessages) +} + +type mockStatsFetch struct { + mock.Mock +} + +func (m *mockStatsFetch) Get(ctx context.Context) (stats.Stats, error) { + args := m.Called(ctx) + return args.Get(0).(stats.Stats), args.Error(1) +} + +func (m *mockStatsFetch) ProtocolName() string { + args := m.Called() + return args.String(0) +} + +type mockHttpClient func(req *http.Request) (*http.Response, error) + +func (m mockHttpClient) Do(req *http.Request) (*http.Response, error) { + return m(req) +} + +type mockFailReadCloser struct { +} + +func (m *mockFailReadCloser) Read(p []byte) (n int, err error) { + return 0, errors.New("mocked_fail_read") +} + +func (m *mockFailReadCloser) Close() error { + return nil +}