[API/PARSER] Chain activity and last transaction endpoints (#175)

* Fix analytic vaa metric and close metric client

* Add analytic component manifest

* Add metrics for cross chain activity endpoint

* Feature/add endpoint get vaa count (#171)

* Add trasaction controller + service

* Init getLastTrx endpoint

* Add endpoint for chain activity

* Add configmap and secrets for influx
Change Dockerfiles to support common library

* Add vaa_count endpoint

* Add cross-chain activity endpoint

* Fix vaa_count endpoint

* Add influx to api-service manifest

* Make response and add docs for chain activity endpoint

* Fix api deploy

* Fix lasl-trx json name

* Fix chain activity endpoint

* Fix doc and endpoint route last-txs

---------

Co-authored-by: Agustin Pazos <agpazos85@gmail.com>
This commit is contained in:
ftocal 2023-03-07 16:25:42 -03:00 committed by GitHub
parent d51695999a
commit 867f1c2962
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
44 changed files with 1364 additions and 43 deletions

View File

@ -3,10 +3,11 @@ FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a
WORKDIR /app
COPY . .
COPY analytic analytic
COPY common common
# Build the Go app
RUN CGO_ENABLED=0 GOOS=linux go build -o "./analytic-pipeline" cmd/main.go
RUN cd analytic && CGO_ENABLED=0 GOOS=linux go build -o "./analytic-pipeline" cmd/main.go
############################
# STEP 2 build a small image
@ -15,6 +16,6 @@ FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/analytic-pipeline" "/analytic-pipeline"
COPY --from=build "/app/analytic/analytic-pipeline" "/analytic-pipeline"
# Run the binary.
ENTRYPOINT ["/analytic-pipeline"]

View File

@ -88,6 +88,8 @@ func main() {
logger.Info("root context cancelled, exiting...")
rootCtxCancel()
logger.Info("Closing metric client ...")
metric.Close()
logger.Info("Closing Http server ...")
server.Stop()
logger.Info("Finished wormhole-explorer-analytic")

View File

@ -2,6 +2,8 @@ package metric
import (
"context"
"strconv"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
@ -16,13 +18,13 @@ type Metric struct {
logger *zap.Logger
}
// New create a new *Metric
// New create a new *Metric.
func New(influxCli influxdb2.Client, organization, bucket string, logger *zap.Logger) *Metric {
writeAPI := influxCli.WriteAPIBlocking(organization, bucket)
return &Metric{influxCli: influxCli, writeApi: writeAPI, logger: logger}
}
// Push implement MetricPushFunc definition
// Push implement MetricPushFunc definition.
func (m *Metric) Push(ctx context.Context, vaa *vaa.VAA) error {
return m.vaaCountMeasurement(ctx, vaa)
}
@ -35,15 +37,14 @@ func (m *Metric) Close() {
// vaaCountMeasurement handle the push of metric point for measurement vaa_count.
func (m *Metric) vaaCountMeasurement(ctx context.Context, vaa *vaa.VAA) error {
measurement := "vaa_count"
point := influxdb2.NewPointWithMeasurement(measurement).
AddField("chainID", uint16(vaa.EmitterChain)).
SetTime(vaa.Timestamp)
// create point for measurement vaa_count.
point := influxdb2.NewPointWithMeasurement("vaa_count").AddTag("chain_id", strconv.Itoa(int(vaa.EmitterChain))).AddField("count", 1).SetTime(vaa.Timestamp.Add(time.Nanosecond * time.Duration(vaa.Sequence)))
// write point to influx
// write point to influx.
err := m.writeApi.WritePoint(ctx, point)
if err != nil {
m.logger.Error("error write metric", zap.String("measurement", measurement),
zap.Uint16("chainID", uint16(vaa.EmitterChain)), zap.Error(err))
zap.Uint16("chain_id", uint16(vaa.EmitterChain)), zap.Error(err))
return err
}
return nil

View File

@ -3,10 +3,11 @@ FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a
WORKDIR /app
COPY . .
COPY api api
COPY common common
# Build the Go app
RUN make build
RUN cd api && make build
############################
# STEP 2 build a small image
@ -15,6 +16,6 @@ FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/api" "/api"
COPY --from=build "/app/api/api" "/api"
# Run the binary.
ENTRYPOINT ["/api"]

View File

@ -505,6 +505,46 @@ const docTemplate = `{
}
}
},
"/api/v1/last-txs": {
"get": {
"description": "Returns the number of transactions [vaa] by a defined time span and sample rate.",
"tags": [
"Wormscan"
],
"operationId": "get-last-transactions",
"parameters": [
{
"type": "string",
"description": "Time Span, default: 1h, examples: 30m, 1h, 1d, 2w, 3mo, 1y, all.",
"name": "timeSpan",
"in": "query"
},
{
"type": "string",
"description": "Sample Rate, default: 1m, examples: 30s, 1m, 1h, 1d, 2w, 3mo, 1y.",
"name": "sampleRate",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/transactions.TransactionCountResult"
}
}
},
"400": {
"description": "Bad Request"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/api/v1/observations": {
"get": {
"description": "Returns all observations.",
@ -1068,6 +1108,55 @@ const docTemplate = `{
}
}
},
"/api/v1/x-chain-activity": {
"get": {
"description": "Returns a list of tx by source chain and destination chain.",
"tags": [
"Wormscan"
],
"operationId": "x-chain-activity",
"parameters": [
{
"type": "string",
"description": "Star time (format: ISO-8601).",
"name": "start_time",
"in": "query"
},
{
"type": "string",
"description": "End time (format: ISO-8601).",
"name": "end_time",
"in": "query"
},
{
"type": "string",
"description": "Renders the results as notional or tx-count (default is notional).",
"name": "by",
"in": "query"
},
{
"type": "string",
"description": "List of apps separated by comma (default is all apps).",
"name": "apps",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/transactions.ChainActivity"
}
},
"400": {
"description": "Bad Request"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/swagger.json": {
"get": {
"description": "Returns the swagger specification for this API.",
@ -2077,6 +2166,62 @@ const docTemplate = `{
}
}
},
"transactions.ChainActivity": {
"type": "object",
"properties": {
"txs": {
"type": "array",
"items": {
"$ref": "#/definitions/transactions.Tx"
}
}
}
},
"transactions.Destination": {
"type": "object",
"properties": {
"chain": {
"type": "integer"
},
"percentage": {
"type": "number"
},
"volume": {
"type": "integer"
}
}
},
"transactions.TransactionCountResult": {
"type": "object",
"properties": {
"count": {
"type": "integer"
},
"time": {
"type": "string"
}
}
},
"transactions.Tx": {
"type": "object",
"properties": {
"chain": {
"type": "integer"
},
"destinations": {
"type": "array",
"items": {
"$ref": "#/definitions/transactions.Destination"
}
},
"percentage": {
"type": "number"
},
"volume": {
"type": "integer"
}
}
},
"vaa.ChainID": {
"type": "integer",
"enum": [

View File

@ -498,6 +498,46 @@
}
}
},
"/api/v1/last-txs": {
"get": {
"description": "Returns the number of transactions [vaa] by a defined time span and sample rate.",
"tags": [
"Wormscan"
],
"operationId": "get-last-transactions",
"parameters": [
{
"type": "string",
"description": "Time Span, default: 1h, examples: 30m, 1h, 1d, 2w, 3mo, 1y, all.",
"name": "timeSpan",
"in": "query"
},
{
"type": "string",
"description": "Sample Rate, default: 1m, examples: 30s, 1m, 1h, 1d, 2w, 3mo, 1y.",
"name": "sampleRate",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "array",
"items": {
"$ref": "#/definitions/transactions.TransactionCountResult"
}
}
},
"400": {
"description": "Bad Request"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/api/v1/observations": {
"get": {
"description": "Returns all observations.",
@ -1061,6 +1101,55 @@
}
}
},
"/api/v1/x-chain-activity": {
"get": {
"description": "Returns a list of tx by source chain and destination chain.",
"tags": [
"Wormscan"
],
"operationId": "x-chain-activity",
"parameters": [
{
"type": "string",
"description": "Star time (format: ISO-8601).",
"name": "start_time",
"in": "query"
},
{
"type": "string",
"description": "End time (format: ISO-8601).",
"name": "end_time",
"in": "query"
},
{
"type": "string",
"description": "Renders the results as notional or tx-count (default is notional).",
"name": "by",
"in": "query"
},
{
"type": "string",
"description": "List of apps separated by comma (default is all apps).",
"name": "apps",
"in": "query"
}
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/transactions.ChainActivity"
}
},
"400": {
"description": "Bad Request"
},
"500": {
"description": "Internal Server Error"
}
}
}
},
"/swagger.json": {
"get": {
"description": "Returns the swagger specification for this API.",
@ -2070,6 +2159,62 @@
}
}
},
"transactions.ChainActivity": {
"type": "object",
"properties": {
"txs": {
"type": "array",
"items": {
"$ref": "#/definitions/transactions.Tx"
}
}
}
},
"transactions.Destination": {
"type": "object",
"properties": {
"chain": {
"type": "integer"
},
"percentage": {
"type": "number"
},
"volume": {
"type": "integer"
}
}
},
"transactions.TransactionCountResult": {
"type": "object",
"properties": {
"count": {
"type": "integer"
},
"time": {
"type": "string"
}
}
},
"transactions.Tx": {
"type": "object",
"properties": {
"chain": {
"type": "integer"
},
"destinations": {
"type": "array",
"items": {
"$ref": "#/definitions/transactions.Destination"
}
},
"percentage": {
"type": "number"
},
"volume": {
"type": "integer"
}
}
},
"vaa.ChainID": {
"type": "integer",
"enum": [

View File

@ -452,6 +452,42 @@ definitions:
next:
type: string
type: object
transactions.ChainActivity:
properties:
txs:
items:
$ref: '#/definitions/transactions.Tx'
type: array
type: object
transactions.Destination:
properties:
chain:
type: integer
percentage:
type: number
volume:
type: integer
type: object
transactions.TransactionCountResult:
properties:
count:
type: integer
time:
type: string
type: object
transactions.Tx:
properties:
chain:
type: integer
destinations:
items:
$ref: '#/definitions/transactions.Destination'
type: array
percentage:
type: number
volume:
type: integer
type: object
vaa.ChainID:
enum:
- 0
@ -883,6 +919,35 @@ paths:
description: Internal Server Error
tags:
- Wormscan
/api/v1/last-txs:
get:
description: Returns the number of transactions [vaa] by a defined time span
and sample rate.
operationId: get-last-transactions
parameters:
- description: 'Time Span, default: 1h, examples: 30m, 1h, 1d, 2w, 3mo, 1y,
all.'
in: query
name: timeSpan
type: string
- description: 'Sample Rate, default: 1m, examples: 30s, 1m, 1h, 1d, 2w, 3mo,
1y.'
in: query
name: sampleRate
type: string
responses:
"200":
description: OK
schema:
items:
$ref: '#/definitions/transactions.TransactionCountResult'
type: array
"400":
description: Bad Request
"500":
description: Internal Server Error
tags:
- Wormscan
/api/v1/observations:
get:
description: Returns all observations.
@ -1254,6 +1319,38 @@ paths:
description: Internal Server Error
tags:
- Wormscan
/api/v1/x-chain-activity:
get:
description: Returns a list of tx by source chain and destination chain.
operationId: x-chain-activity
parameters:
- description: 'Star time (format: ISO-8601).'
in: query
name: start_time
type: string
- description: 'End time (format: ISO-8601).'
in: query
name: end_time
type: string
- description: Renders the results as notional or tx-count (default is notional).
in: query
name: by
type: string
- description: List of apps separated by comma (default is all apps).
in: query
name: apps
type: string
responses:
"200":
description: OK
schema:
$ref: '#/definitions/transactions.ChainActivity'
"400":
description: Bad Request
"500":
description: Internal Server Error
tags:
- Wormscan
/swagger.json:
get:
description: Returns the swagger specification for this API.

View File

@ -9,7 +9,9 @@ require (
github.com/go-redis/redis/v8 v8.11.5
github.com/gofiber/adaptor/v2 v2.1.29
github.com/gofiber/fiber/v2 v2.39.0
github.com/influxdata/influxdb-client-go/v2 v2.4.0
github.com/ipfs/go-log/v2 v2.5.1
github.com/mitchellh/mapstructure v1.5.0
github.com/pkg/errors v0.9.1
github.com/spf13/viper v1.13.0
github.com/swaggo/swag v1.8.9
@ -31,6 +33,7 @@ require (
github.com/cenkalti/backoff/v4 v4.1.3 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
@ -45,6 +48,7 @@ require (
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0 // indirect
github.com/grpc-ecosystem/grpc-gateway/v2 v2.5.0 // indirect
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
github.com/ipfs/go-cid v0.2.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
github.com/klauspost/compress v1.15.12 // indirect
@ -60,7 +64,6 @@ require (
github.com/mattn/go-runewidth v0.0.14 // indirect
github.com/matttproud/golang_protobuf_extensions v1.0.4 // indirect
github.com/minio/sha256-simd v1.0.0 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect
github.com/mr-tron/base58 v1.2.0 // indirect
github.com/multiformats/go-base32 v0.0.4 // indirect

View File

@ -106,12 +106,16 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc=
github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE9iAUlWl9Al3M=
github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbzjuhfU=
github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw=
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f h1:U5y3Y5UE0w7amNe7Z5G/twsBW0KEalRQXZzf8ufSh9I=
github.com/desertbit/timer v0.0.0-20180107155436-c41aec40b27f/go.mod h1:xH/i4TFMt8koVQZ6WFms69WAsDWr2XsYL3Hkl7jkoLE=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
@ -139,11 +143,14 @@ github.com/frankban/quicktest v1.14.3 h1:FJKSZTDHjyhriyC81FLQ0LY93eSai0ZyR/ZIkd3
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.5.4 h1:jRbGcIw6P2Meqdwuo0H1p6JVLbL5DHKAKlYndzMwVZI=
github.com/fsnotify/fsnotify v1.5.4/go.mod h1:OVB6XrOHzAwXMpEM7uPOzcehqUV2UqJxmVXmkdnm1bU=
github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.6.3 h1:ahKqKTFpO5KTPHxWZjEdPScmYaGtLo8Y4DMHoEsnp14=
github.com/gin-gonic/gin v1.6.3/go.mod h1:75u5sXoLsGZoRN5Sgbi1eraJ4GU3++wFwWzhwvtwp4M=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1/go.mod h1:vR7hzQXu2zJy9AVAgeJqvqgH9Q5CA+iKCZ2gyEVpxRU=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20191125211704-12ad95a8df72/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
github.com/go-gl/glfw/v3.3/glfw v0.0.0-20200222043503-6f7a984d4dc4/go.mod h1:tQ2UAYgL5IevRw8kRxooKSPJfGvJ9fJQFa0TUsXzTg8=
@ -225,6 +232,7 @@ github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8l
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
@ -263,6 +271,7 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
@ -304,7 +313,12 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:
github.com/improbable-eng/grpc-web v0.15.0 h1:BN+7z6uNXZ1tQGcNAuaU1YjsLTApzkjt2tzCixLaUPQ=
github.com/improbable-eng/grpc-web v0.15.0/go.mod h1:1sy9HKV4Jt9aEs9JSnkWlRJPuPtwNr0l57L4f878wP8=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/influxdb-client-go/v2 v2.4.0 h1:HGBfZYStlx3Kqvsv1h2pJixbCl/jhnFtxpKFAv9Tu5k=
github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
github.com/influxdata/influxdb1-client v0.0.0-20191209144304-8bf82d3c094d/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs/go-cid v0.2.0 h1:01JTiihFq9en9Vz0lc0VDWvZe/uBonGpzo4THP0vcQ0=
github.com/ipfs/go-cid v0.2.0/go.mod h1:P+HXFDF4CVhaVayiEb4wkAy7zBHxBwsJyt0Y5U6MLro=
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
@ -350,6 +364,8 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/leodido/go-urn v1.2.0 h1:hpXL4XnriNwQ/ABnpepYM/1vCLWNDfUNts8dX3xTG6Y=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8=
@ -367,11 +383,17 @@ github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.7.6 h1:8yTIVnZgCoiM1TgqoeTl+LfU5Jg6/xL3QhGQnimLYnA=
github.com/mailru/easyjson v0.7.6/go.mod h1:xzfreul335JAWq5oZzymOObrkdz5UnU4kGfJJLY9Nlc=
github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ=
github.com/mattn/go-colorable v0.0.9/go.mod h1:9vuHe8Xs5qXnSaW/c/ABM9alt+Vo+STaOChaDxuIBZU=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
@ -583,6 +605,8 @@ github.com/valyala/fasthttp v1.38.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxn
github.com/valyala/fasthttp v1.40.0/go.mod h1:t/G+3rLek+CyY9bnIE+YlMRddxVAAGjhxndDB4i4C0I=
github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY=
github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20221118153622-cddfe74b6787 h1:DTlEqjjlMddN3py3sGtuqOOtxvhXiFIH9N9nhawC7t4=
@ -640,6 +664,8 @@ golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20211108221036-ceb1ce70b4fa/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
@ -722,6 +748,7 @@ golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwY
golang.org/x/net v0.0.0-20201031054903-ff519b6c9102/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.0.0-20201209123823-ac852fbbde11/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20201224014010-6772e930b67b/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210421230115-4e50805a0758/go.mod h1:72T/g9IO56b78aLF+1Kcs5dz7/ng1VjMUvfKvpfy+jM=
@ -767,6 +794,7 @@ golang.org/x/sys v0.0.0-20181107165924-66b7b1311ac8/go.mod h1:STP8DvDyc/dI5b8T5h
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181122145206-62eef0e2fa9b/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190422165155-953cdadca894/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -775,6 +803,7 @@ golang.org/x/sys v0.0.0-20190507160741-ecd444e8653b/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190826190057-c7b8b68b1456/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -799,6 +828,7 @@ golang.org/x/sys v0.0.0-20200523222454-059865788121/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20200615200032-f1bc736245b1/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200803210538-64077c9b5642/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200905004654-be1d3432aa8f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
@ -825,6 +855,7 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@ -842,6 +873,8 @@ golang.org/x/time v0.0.0-20180412165947-fbb02b2291d2/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190114222345-bf090417da8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=

View File

@ -35,7 +35,6 @@ func (r *Repository) FindByIDs(ctx context.Context, ids []string) ([]*HeartbeatD
requestID := fmt.Sprintf("%v", ctx.Value("requestid"))
r.logger.Error("failed execute Find command to get heartbeats",
zap.Error(err), zap.Strings("ids", ids), zap.String("requestID", requestID))
//zap.Any("q", q)
return nil, errors.WithStack(err)
}
var heartbeats []*HeartbeatDoc

View File

@ -0,0 +1,49 @@
package transactions
import "time"
type TransactionCountQuery struct {
TimeSpan string
SampleRate string
CumulativeSum bool
}
type TransactionCountResult struct {
Time time.Time `json:"time" mapstructure:"_time"`
Count uint64 `json:"count" mapstructure:"count"`
}
type ChainActivityResult struct {
ChainSourceID string `mapstructure:"chain_source_id"`
ChainDestinationID string `mapstructure:"chain_destination_id"`
Volume uint64 `mapstructure:"volume"`
}
type ChainActivityQuery struct {
Start *time.Time
End *time.Time
AppIDs []string
IsNotional bool
}
func (q *ChainActivityQuery) HasAppIDS() bool {
return len(q.AppIDs) > 0
}
func (q *ChainActivityQuery) GetAppIDs() []string {
return q.AppIDs
}
func (q *ChainActivityQuery) GetStart() time.Time {
if q.Start == nil {
return time.UnixMilli(0)
}
return *q.Start
}
func (q *ChainActivityQuery) GetEnd() time.Time {
if q.End == nil {
return time.Now()
}
return *q.End
}

View File

@ -0,0 +1,113 @@
package transactions
import (
"context"
"fmt"
"strings"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/mitchellh/mapstructure"
"go.uber.org/zap"
)
const queryTemplate = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "vaa_volume" and r._field == "volume")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> %s(column: "volume")
`
const queryTemplateWithApps = `
from(bucket: "%s")
|> range(start: %s, stop: %s)
|> filter(fn: (r) => r._measurement == "vaa_volume")
|> filter(fn: (r) => r._field == "volume" or r._field == "app_id")
|> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value")
|> filter(fn: (r) => contains(value: r.app_id, set: %s))
|> %s(column: "volume")
`
const queryTemplateVaaCount = `
from(bucket: "%s")
|> range(start: -%s)
|> filter(fn: (r) => r["_measurement"] == "vaa_count")
|> group()
|> aggregateWindow(every: %s, fn: count, createEmpty: true)
|> map(fn:(r) => ( {_time: r._time, count: r._value}))
`
type Repository struct {
influxCli influxdb2.Client
queryAPI api.QueryAPI
bucket string
logger *zap.Logger
}
func NewRepository(client influxdb2.Client, org, bucket string, logger *zap.Logger) *Repository {
queryAPI := client.QueryAPI(org)
return &Repository{influxCli: client, queryAPI: queryAPI, bucket: bucket, logger: logger}
}
func (r *Repository) FindChainActivity(ctx context.Context, q *ChainActivityQuery) ([]ChainActivityResult, error) {
query := r.buildFindVolumeQuery(q)
result, err := r.queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
if result.Err() != nil {
return nil, result.Err()
}
var response []ChainActivityResult
for result.Next() {
var row ChainActivityResult
if err := mapstructure.Decode(result.Record().Values(), &row); err != nil {
return nil, err
}
response = append(response, row)
}
return response, nil
}
func (r *Repository) buildFindVolumeQuery(q *ChainActivityQuery) string {
start := q.GetStart().UTC().Format(time.RFC3339)
stop := q.GetEnd().UTC().Format(time.RFC3339)
var operation string
if q.IsNotional {
operation = "sum"
} else {
operation = "count"
}
if q.HasAppIDS() {
apps := `["` + strings.Join(q.GetAppIDs(), `","`) + `"]`
return fmt.Sprintf(queryTemplateWithApps, r.bucket, start, stop, apps, operation)
}
return fmt.Sprintf(queryTemplate, r.bucket, start, stop, operation)
}
// GetTransactionCount get the last transactions.
func (r *Repository) GetTransactionCount(ctx context.Context, q *TransactionCountQuery) ([]TransactionCountResult, error) {
query := r.buildLastTrxQuery(q)
result, err := r.queryAPI.Query(ctx, query)
if err != nil {
return nil, err
}
if result.Err() != nil {
return nil, result.Err()
}
response := []TransactionCountResult{}
for result.Next() {
var row TransactionCountResult
if err := mapstructure.Decode(result.Record().Values(), &row); err != nil {
return nil, err
}
response = append(response, row)
}
return response, nil
}
func (r *Repository) buildLastTrxQuery(q *TransactionCountQuery) string {
return fmt.Sprintf(queryTemplateVaaCount, r.bucket, q.TimeSpan, q.SampleRate)
}

View File

@ -0,0 +1,27 @@
package transactions
import (
"context"
"go.uber.org/zap"
)
type Service struct {
repo *Repository
logger *zap.Logger
}
// NewService create a new Service.
func NewService(repo *Repository, logger *zap.Logger) *Service {
return &Service{repo: repo, logger: logger.With(zap.String("module", "TransactionService"))}
}
// GetTransactionCount get the last transactions.
func (s *Service) GetTransactionCount(ctx context.Context, q *TransactionCountQuery) ([]TransactionCountResult, error) {
return s.repo.GetTransactionCount(ctx, q)
}
// GetChainActivity get chain activity.
func (s *Service) GetChainActivity(ctx context.Context, q *ChainActivityQuery) ([]ChainActivityResult, error) {
return s.repo.FindChainActivity(ctx, q)
}

View File

@ -40,6 +40,12 @@ type AppConfig struct {
RunMode string
P2pNetwork string
PprofEnabled bool
Influx struct {
URL string
Token string
Organization string
Bucket string
}
}
// GetLogLevel get zapcore.Level define in the configuraion.

View File

@ -17,11 +17,13 @@ import (
"github.com/gofiber/fiber/v2/middleware/requestid"
"github.com/improbable-eng/grpc-web/go/grpcweb"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/governor"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/heartbeats"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
wormscanCache "github.com/wormhole-foundation/wormhole-explorer/api/internal/cache"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/config"
@ -102,12 +104,16 @@ func main() {
// Get cache get function
cacheGetFunc := NewCache(cfg, rootLogger)
//InfluxDB client
influxCli := newInfluxClient(cfg.Influx.URL, cfg.Influx.Token)
// Set up repositories
vaaRepo := vaa.NewRepository(db, rootLogger)
obsRepo := observations.NewRepository(db, rootLogger)
governorRepo := governor.NewRepository(db, rootLogger)
infrastructureRepo := infrastructure.NewRepository(db, rootLogger)
heartbeatsRepo := heartbeats.NewRepository(db, rootLogger)
transactionsRepo := transactions.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket, rootLogger)
// Set up services
vaaService := vaa.NewService(vaaRepo, cacheGetFunc, rootLogger)
@ -115,6 +121,7 @@ func main() {
governorService := governor.NewService(governorRepo, rootLogger)
infrastructureService := infrastructure.NewService(infrastructureRepo, rootLogger)
heartbeatsService := heartbeats.NewService(heartbeatsRepo, rootLogger)
transactionsService := transactions.NewService(transactionsRepo, rootLogger)
// Set up a custom error handler
response.SetEnableStackTrace(*cfg)
@ -135,7 +142,7 @@ func main() {
// Set up route handlers
app.Get("/swagger.json", GetSwagger)
wormscan.RegisterRoutes(app, rootLogger, vaaService, obsService, governorService, infrastructureService)
wormscan.RegisterRoutes(app, rootLogger, vaaService, obsService, governorService, infrastructureService, transactionsService)
guardian.RegisterRoutes(cfg, app, rootLogger, vaaService, governorService, heartbeatsService)
// Set up gRPC handlers
@ -165,3 +172,7 @@ func NewCache(cfg *config.AppConfig, looger *zap.Logger) wormscanCache.CacheGetF
cacheClient := wormscanCache.NewCacheClient(cfg.Cache.URL, cfg.Cache.Enabled, looger)
return cacheClient.Get
}
func newInfluxClient(url, token string) influxdb2.Client {
return influxdb2.NewClient(url, token)
}

View File

@ -3,7 +3,10 @@ package middleware
import (
"fmt"
"regexp"
"strconv"
"strings"
"time"
"github.com/gofiber/fiber/v2"
"github.com/pkg/errors"
@ -197,3 +200,75 @@ func ExtractParsedPayload(c *fiber.Ctx, l *zap.Logger) (bool, error) {
func ExtractAppId(c *fiber.Ctx, l *zap.Logger) string {
return c.Query("appId")
}
func ExtractTimeSpan(c *fiber.Ctx, l *zap.Logger) (string, error) {
// get the timeSpan from query params
timeSpanStr := c.Query("timeSpan", "1h")
if timeSpanStr == "all" {
return timeSpanStr, nil
}
// validate the timeSpan
if !isValidTimeSpan(timeSpanStr) {
return "", response.NewInvalidQueryParamError(c, "INVALID <timeSpan> QUERY PARAMETER", nil)
}
return timeSpanStr, nil
}
// isValidTimeSpan check if the timeSpan is valid
func isValidTimeSpan(timeSpan string) bool {
return regexp.MustCompile(`^all$|^\d+[mhdwy]$|^\dmo$`).MatchString(timeSpan)
}
func ExtractSampleRate(c *fiber.Ctx, l *zap.Logger) (string, error) {
// get the sampleRate from query params
sampleRateStr := c.Query("sampleRate", "1m")
if sampleRateStr == "1y" {
return sampleRateStr, nil
}
// validate the sampleRate
if !isValidSampleRate(sampleRateStr) {
return "", response.NewInvalidQueryParamError(c, "INVALID <sampleRate> QUERY PARAMETER", nil)
}
return sampleRateStr, nil
}
func isValidSampleRate(sampleRate string) bool {
return regexp.MustCompile(`^\d+[smhdwy]$|^\dmo$`).MatchString(sampleRate)
}
func ExtractTime(c *fiber.Ctx, queryParam string) (*time.Time, error) {
// get the start_time from query params
date := c.Query(queryParam, "")
if date == "" {
return nil, nil
}
t, err := time.Parse("20060102T150405Z", date)
if err != nil {
return nil, response.NewInvalidQueryParamError(c, fmt.Sprintf("INVALID <%s> QUERY PARAMETER", queryParam), nil)
}
return &t, nil
}
func ExtractApps(ctx *fiber.Ctx) ([]string, error) {
apps := ctx.Query("apps")
if apps == "" {
return nil, nil
}
return strings.Split(apps, ","), nil
}
func ExtractIsNotional(ctx *fiber.Ctx) (bool, error) {
by := ctx.Query("by")
if by == "" {
return true, nil
}
if by == "notional" {
return true, nil
}
if by == "tx" {
return false, nil
}
return false, response.NewInvalidQueryParamError(ctx, "INVALID <by> QUERY PARAMETER", nil)
}

View File

@ -9,10 +9,12 @@ import (
govsvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/governor"
infrasvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/infrastructure"
obssvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/observations"
trxsvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
vaasvc "github.com/wormhole-foundation/wormhole-explorer/api/handlers/vaa"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/governor"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/observations"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/routes/wormscan/vaa"
"go.uber.org/zap"
)
@ -34,6 +36,7 @@ func RegisterRoutes(
obsService *obssvc.Service,
governorService *govsvc.Service,
infrastructureService *infrasvc.Service,
transactionsService *trxsvc.Service,
) {
// Set up controllers
@ -41,6 +44,7 @@ func RegisterRoutes(
observationsCtrl := observations.NewController(obsService, rootLogger)
governorCtrl := governor.NewController(governorService, rootLogger)
infrastructureCtrl := infrastructure.NewController(infrastructureService)
transactionCtrl := transactions.NewController(transactionsService, rootLogger)
// Set up route handlers
api := app.Group("/api/v1")
@ -51,6 +55,10 @@ func RegisterRoutes(
api.Get("/ready", infrastructureCtrl.ReadyCheck)
api.Get("/version", infrastructureCtrl.Version)
// analytics
api.Get("/last-txs", transactionCtrl.GetLastTransactions)
api.Get("/x-chain-activity", transactionCtrl.GetChainActivity)
// vaas resource
vaas := api.Group("/vaas")
vaas.Use(cache.New(cacheConfig))

View File

@ -0,0 +1,146 @@
package transactions
import (
"strconv"
"github.com/gofiber/fiber/v2"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/transactions"
"github.com/wormhole-foundation/wormhole-explorer/api/middleware"
"go.uber.org/zap"
)
// Controller is the controller for the transactions resource.
type Controller struct {
srv *transactions.Service
logger *zap.Logger
}
// NewController create a new controler.
func NewController(transactionsService *transactions.Service, logger *zap.Logger) *Controller {
return &Controller{
srv: transactionsService,
logger: logger.With(zap.String("module", "TransactionsController")),
}
}
// GetLastTransactions godoc
// @Description Returns the number of transactions [vaa] by a defined time span and sample rate.
// @Tags Wormscan
// @ID get-last-transactions
// @Param timeSpan query string false "Time Span, default: 1h, examples: 30m, 1h, 1d, 2w, 3mo, 1y, all."
// @Param sampleRate query string false "Sample Rate, default: 1m, examples: 30s, 1m, 1h, 1d, 2w, 3mo, 1y."
// @Success 200 {object} []transactions.TransactionCountResult
// @Failure 400
// @Failure 500
// @Router /api/v1/last-txs [get]
func (c *Controller) GetLastTransactions(ctx *fiber.Ctx) error {
timeSpan, err := middleware.ExtractTimeSpan(ctx, c.logger)
if err != nil {
return err
}
sampleRate, err := middleware.ExtractSampleRate(ctx, c.logger)
if err != nil {
return err
}
q := &transactions.TransactionCountQuery{
TimeSpan: timeSpan,
SampleRate: sampleRate,
}
// Get transaction count.
lastTrx, err := c.srv.GetTransactionCount(ctx.Context(), q)
if err != nil {
return err
}
return ctx.JSON(lastTrx)
}
// GetChainActivity godoc
// @Description Returns a list of tx by source chain and destination chain.
// @Tags Wormscan
// @ID x-chain-activity
// @Param start_time query string false "Star time (format: ISO-8601)."
// @Param end_time query string false "End time (format: ISO-8601)."
// @Param by query string false "Renders the results as notional or tx-count (default is notional)."
// @Param apps query string false "List of apps separated by comma (default is all apps)."
// @Success 200 {object} transactions.ChainActivity
// @Failure 400
// @Failure 500
// @Router /api/v1/x-chain-activity [get]
func (c *Controller) GetChainActivity(ctx *fiber.Ctx) error {
startTime, err := middleware.ExtractTime(ctx, "start_time")
if err != nil {
return err
}
endTime, err := middleware.ExtractTime(ctx, "end_time")
if err != nil {
return err
}
apps, err := middleware.ExtractApps(ctx)
if err != nil {
return err
}
isNotional, err := middleware.ExtractIsNotional(ctx)
if err != nil {
return err
}
q := &transactions.ChainActivityQuery{
Start: startTime,
End: endTime,
AppIDs: apps,
IsNotional: isNotional,
}
// Get the chain activity.
activity, err := c.srv.GetChainActivity(ctx.Context(), q)
if err != nil {
c.logger.Error("Error getting chain activity", zap.Error(err))
return err
}
// Convert the result to the expected format.
txByChainID := make(map[int]*Tx)
total := uint64(0)
for _, item := range activity {
chainSourceID, err := strconv.Atoi(item.ChainSourceID)
if err != nil {
c.logger.Error("Error during conversion of chainSourceId", zap.Error(err))
return err
}
t, ok := txByChainID[chainSourceID]
if !ok {
destinations := make([]Destination, 0)
t = &Tx{Chain: chainSourceID, Volume: 0, Percentage: 0, Destinations: destinations}
}
chainDestinationID, err := strconv.Atoi(item.ChainDestinationID)
if err != nil {
c.logger.Error("Error during conversion of chainDestinationId", zap.Error(err))
return err
}
destination := Destination{Chain: chainDestinationID, Volume: item.Volume, Percentage: 0}
t.Destinations = append(t.Destinations, destination)
t.Volume += item.Volume
txByChainID[chainSourceID] = t
total += item.Volume
}
txs := make([]Tx, 0)
for _, item := range txByChainID {
if total > 0 {
percentage := float64(item.Volume*100) / float64(total)
item.Percentage = percentage
}
for i, destination := range item.Destinations {
if item.Volume > 0 {
item.Destinations[i].Percentage = float64(destination.Volume*100) / float64(item.Volume)
}
}
txs = append(txs, *item)
}
return ctx.JSON(ChainActivity{Txs: txs})
}

View File

@ -0,0 +1,19 @@
package transactions
type Tx struct {
Chain int `json:"chain"`
Volume uint64 `json:"volume"`
Percentage float64 `json:"percentage"`
Destinations []Destination `json:"destinations"`
}
type Destination struct {
Chain int `json:"chain"`
Volume uint64 `json:"volume"`
Percentage float64 `json:"percentage"`
}
// ChainActivity represent a cross chain activity.
type ChainActivity struct {
Txs []Tx `json:"txs"`
}

View File

@ -0,0 +1,90 @@
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: {{ .NAME }}
namespace: {{ .NAMESPACE }}
spec:
replicas: {{ .REPLICAS }}
selector:
matchLabels:
app: {{ .NAME }}
template:
metadata:
labels:
app: {{ .NAME }}
spec:
restartPolicy: Always
terminationGracePeriodSeconds: 40
containers:
- name: {{ .NAME }}
image: {{ .IMAGE_NAME }}
imagePullPolicy: Always
readinessProbe:
initialDelaySeconds: 30
periodSeconds: 20
timeoutSeconds: 3
failureThreshold: 3
httpGet:
path: /api/ready
port: 8000
livenessProbe:
initialDelaySeconds: 30
periodSeconds: 30
timeoutSeconds: 3
failureThreshold: 3
httpGet:
path: /api/health
port: 8000
env:
- name: ENV
value: "PRODUCTION"
- name: PORT
value: "8000"
- name: LOG_LEVEL
value: "INFO"
- name: SQS_URL
value: {{ .SQS_URL }}
- name: AWS_REGION
value: {{ .SQS_AWS_REGION }}
- name: AWS_ACCESS_KEY_ID
valueFrom:
secretKeyRef:
name: analytic-sqs
key: aws-access-key-id
- name: AWS_SECRET_ACCESS_KEY
valueFrom:
secretKeyRef:
name: analytic-sqs
key: aws-secret-access-key
- name: PPROF_ENABLED
value: "{{ .PPROF_ENABLED }}"
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: INFLUX_URL
valueFrom:
configMapKeyRef:
name: config
key: influxdb-url
- name: INFLUX_TOKEN
valueFrom:
secretKeyRef:
name: influxdb
key: token
- name: INFLUX_ORGANIZATION
valueFrom:
configMapKeyRef:
name: config
key: influxdb-organization
- name: INFLUX_BUCKET
valueFrom:
configMapKeyRef:
name: config
key: influxdb-bucket
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}
cpu: {{ .RESOURCES_LIMITS_CPU }}
requests:
memory: {{ .RESOURCES_REQUESTS_MEMORY }}
cpu: {{ .RESOURCES_REQUESTS_CPU }}

15
deploy/analytic/env/production.env vendored Normal file
View File

@ -0,0 +1,15 @@
ENVIRONMENT=production
NAMESPACE=wormscan
NAME=wormscan-analytic
REPLICAS=2
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=256Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=128Mi
RESOURCES_REQUESTS_CPU=250m
SQS_URL=
SQS_AWS_REGION=
SQS_ACCESS_KEY_ID=
SQS_SECRET_ACCESS_KEY=
P2P_NETWORK=mainnet
PPROF_ENABLED=false

15
deploy/analytic/env/staging.env vendored Normal file
View File

@ -0,0 +1,15 @@
ENVIRONMENT=staging
NAMESPACE=wormscan
NAME=wormscan-analytic
REPLICAS=2
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=128Mi
RESOURCES_LIMITS_CPU=500m
RESOURCES_REQUESTS_MEMORY=64Mi
RESOURCES_REQUESTS_CPU=250m
SQS_URL=
SQS_AWS_REGION=
SQS_ACCESS_KEY_ID=
SQS_SECRET_ACCESS_KEY=
P2P_NETWORK=mainnet
PPROF_ENABLED=true

15
deploy/analytic/env/test.env vendored Normal file
View File

@ -0,0 +1,15 @@
ENVIRONMENT=test
NAMESPACE=wormscan-testnet
NAME=wormscan-analytic
REPLICAS=1
IMAGE_NAME=
RESOURCES_LIMITS_MEMORY=128Mi
RESOURCES_LIMITS_CPU=200m
RESOURCES_REQUESTS_MEMORY=64Mi
RESOURCES_REQUESTS_CPU=100m
SQS_URL=
SQS_AWS_REGION=
SQS_ACCESS_KEY_ID=
SQS_SECRET_ACCESS_KEY=
P2P_NETWORK=testnet
PPROF_ENABLED=false

View File

@ -0,0 +1,11 @@
---
kind: Secret
apiVersion: v1
metadata:
name: analytic-sqs
namespace: {{ .NAMESPACE }}
data:
aws-access-key-id: {{ .SQS_ACCESS_KEY_ID }}
aws-secret-access-key: {{ .SQS_SECRET_ACCESS_KEY }}
type: Opaque

View File

@ -80,6 +80,26 @@ spec:
value: "true"
- name: WORMSCAN_PPROF_ENABLED
value: "{{ .WORMSCAN_PPROF_ENABLED }}"
- name: WORMSCAN_INFLUX_URL
valueFrom:
configMapKeyRef:
name: config
key: influxdb-url
- name: WORMSCAN_INFLUX_TOKEN
valueFrom:
secretKeyRef:
name: influxdb
key: token
- name: WORMSCAN_INFLUX_ORGANIZATION
valueFrom:
configMapKeyRef:
name: config
key: influxdb-organization
- name: WORMSCAN_INFLUX_BUCKET
valueFrom:
configMapKeyRef:
name: config
key: influxdb-bucket
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}

View File

@ -6,4 +6,7 @@ metadata:
namespace: {{ .NAMESPACE }}
data:
mongo-database: {{ .MONGODB_DATABASE }}
redis-uri: {{ .REDIS_URI }}
redis-uri: {{ .REDIS_URI }}
influxdb-url: {{ .INFLUX_URL }}
influxdb-organization: {{ .INFLUX_ORGANIZATION }}
influxdb-bucket: {{ .INFLUX_BUCKET }}

View File

@ -2,4 +2,8 @@ ENVIRONMENT=production
NAMESPACE=wormscan
MONGODB_URI=
MONGODB_DATABASE=
REDIS_URI=
REDIS_URI=
INFLUX_URL=
INFLUX_TOKEN=
INFLUX_ORGANIZATION=
INFLUX_BUCKET=

View File

@ -2,4 +2,8 @@ ENVIRONMENT=staging
NAMESPACE=wormscan
MONGODB_URI=
MONGODB_DATABASE=
REDIS_URI=
REDIS_URI=
INFLUX_URL=
INFLUX_TOKEN=
INFLUX_ORGANIZATION=
INFLUX_BUCKET=

View File

@ -2,4 +2,8 @@ ENVIRONMENT=test
NAMESPACE=wormscan-testnet
MONGODB_URI=
MONGODB_DATABASE=
REDIS_URI=
REDIS_URI=
INFLUX_URL=
INFLUX_TOKEN=
INFLUX_ORGANIZATION=
INFLUX_BUCKET=

View File

@ -6,5 +6,13 @@ metadata:
namespace: {{ .NAMESPACE }}
data:
mongo-uri: {{ .MONGODB_URI }}
type: Opaque
---
kind: Secret
apiVersion: v1
metadata:
name: influxdb
namespace: {{ .NAMESPACE }}
data:
token: {{ .INFLUX_TOKEN }}
type: Opaque

View File

@ -75,6 +75,26 @@ spec:
value: "{{ .PPROF_ENABLED }}"
- name: P2P_NETWORK
value: {{ .P2P_NETWORK }}
- name: INFLUX_URL
valueFrom:
configMapKeyRef:
name: config
key: influxdb-url
- name: INFLUX_TOKEN
valueFrom:
secretKeyRef:
name: influxdb
key: token
- name: INFLUX_ORGANIZATION
valueFrom:
configMapKeyRef:
name: config
key: influxdb-organization
- name: INFLUX_BUCKET
valueFrom:
configMapKeyRef:
name: config
key: influxdb-bucket
resources:
limits:
memory: {{ .RESOURCES_LIMITS_MEMORY }}

View File

@ -3,10 +3,11 @@ FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a
WORKDIR /app
COPY . .
COPY fly fly
COPY common common
# Build the Go app
RUN CGO_ENABLED=1 GOOS=linux go build -o "./fly" main.go && \
RUN cd fly && CGO_ENABLED=1 GOOS=linux go build -o "./fly" main.go && \
go get github.com/CosmWasm/wasmvm@v1.0.0 && \
cp /go/pkg/mod/github.com/!cosm!wasm/wasmvm@v1.0.0/api/libwasmvm.x86_64.so /usr/lib/
@ -22,7 +23,7 @@ COPY --from=build /lib/* /lib/
COPY --from=build /lib64/* /lib64/
COPY --from=build /usr/lib/libwasmvm.x86_64.so /usr/lib/
#Copy our static executable.
COPY --from=build "/app/fly" "/fly"
COPY --from=build "/app/fly/fly" "/fly"
# Run the binary.
ENTRYPOINT ["/fly"]

View File

@ -3,10 +3,11 @@ FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a
WORKDIR /app
COPY . .
COPY parser parser
COPY common common
# Build the Go app
RUN CGO_ENABLED=0 GOOS=linux go build -o "./parser-pipeline" cmd/main.go
RUN cd parser && CGO_ENABLED=0 GOOS=linux go build -o "./parser-pipeline" cmd/main.go
############################
# STEP 2 build a small image
@ -15,6 +16,6 @@ FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/parser-pipeline" "/parser-pipeline"
COPY --from=build "/app/parser/parser-pipeline" "/parser-pipeline"
# Run the binary.
ENTRYPOINT ["/parser-pipeline"]

View File

@ -10,13 +10,16 @@ import (
"github.com/aws/aws-sdk-go-v2/aws"
awsconfig "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
ipfslog "github.com/ipfs/go-log/v2"
"github.com/wormhole-foundation/wormhole-explorer/parser/config"
"github.com/wormhole-foundation/wormhole-explorer/parser/consumer"
"github.com/wormhole-foundation/wormhole-explorer/parser/http/infrastructure"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/db"
"github.com/wormhole-foundation/wormhole-explorer/parser/internal/sqs"
"github.com/wormhole-foundation/wormhole-explorer/parser/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
"go.uber.org/zap"
)
@ -68,8 +71,13 @@ func main() {
sqsConsumer, vaaConsumeFunc := newVAAConsume(rootCtx, config, logger)
repository := parser.NewRepository(db.Database, logger)
//create a processor
influxCli := newInfluxClient(config.InfluxUrl, config.InfluxToken)
metrics := metrics.New(influxCli, config.InfluxOrg, config.InfluxBucket, logger)
processor := processor.New(repository, metrics, logger)
// create and start a consumer
consumer := consumer.New(vaaConsumeFunc, repository, parserVAAAPIClient, logger)
consumer := consumer.New(vaaConsumeFunc, processor.Process, parserVAAAPIClient, logger)
consumer.Start(rootCtx)
server := infrastructure.NewServer(logger, config.Port, config.PprofEnabled, config.IsQueueConsumer(), sqsConsumer, db.Database)
@ -149,3 +157,7 @@ func newFilterFunc(cfg *config.Configuration) queue.FilterConsumeFunc {
}
return queue.NonFilter
}
func newInfluxClient(url, token string) influxdb2.Client {
return influxdb2.NewClient(url, token)
}

View File

@ -31,6 +31,10 @@ type Configuration struct {
VaaPayloadParserTimeout int64 `env:"VAA_PAYLOAD_PARSER_TIMEOUT, required"`
PprofEnabled bool `env:"PPROF_ENABLED,default=false"`
P2pNetwork string `env:"P2P_NETWORK,required"`
InfluxUrl string `env:"INFLUX_URL,required"`
InfluxToken string `env:"INFLUX_TOKEN,required"`
InfluxOrg string `env:"INFLUX_ORGANIZATION,required"`
InfluxBucket string `env:"INFLUX_BUCKET,required"`
}
// New creates a configuration with the values from .env file and environment variables.

View File

@ -6,6 +6,7 @@ import (
"time"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"github.com/wormhole-foundation/wormhole-explorer/parser/processor"
"github.com/wormhole-foundation/wormhole-explorer/parser/queue"
"github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
@ -13,15 +14,15 @@ import (
// Consumer consumer struct definition.
type Consumer struct {
consume queue.VAAConsumeFunc
repository *parser.Repository
parser parser.ParserVAAAPIClient
logger *zap.Logger
consume queue.VAAConsumeFunc
process processor.ProcessorFunc
parser parser.ParserVAAAPIClient
logger *zap.Logger
}
// New creates a new vaa consumer.
func New(consume queue.VAAConsumeFunc, repository *parser.Repository, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer {
return &Consumer{consume: consume, repository: repository, parser: parser, logger: logger}
func New(consume queue.VAAConsumeFunc, process processor.ProcessorFunc, parser parser.ParserVAAAPIClient, logger *zap.Logger) *Consumer {
return &Consumer{consume: consume, process: process, parser: parser, logger: logger}
}
// Start consumes messages from VAA queue, parse and store those messages in a repository.
@ -70,19 +71,19 @@ func (c *Consumer) Start(ctx context.Context) {
Sequence: event.Sequence,
AppID: vaaParseResponse.AppID,
Result: vaaParseResponse.Result,
Timestamp: vaa.Timestamp,
UpdatedAt: &now,
}
err = c.repository.UpsertParsedVaa(ctx, vaaParsed)
err = c.process(ctx, &vaaParsed)
if err != nil {
c.logger.Error("Error inserting vaa in repository",
c.logger.Error("Error processing parsed vaa",
zap.String("id", event.ID),
zap.Error(err))
msg.Failed()
continue
}
msg.Done()
c.logger.Info("Vaa save in repository", zap.String("id", event.ID))
}
}()
}

View File

@ -19,6 +19,8 @@ require (
github.com/aws/aws-sdk-go-v2/config v1.1.1
github.com/aws/aws-sdk-go-v2/credentials v1.1.1
github.com/aws/aws-sdk-go-v2/service/sqs v1.20.1
github.com/influxdata/influxdb-client-go/v2 v2.4.0
github.com/mitchellh/mapstructure v1.5.0
)
require (
@ -34,9 +36,11 @@ require (
github.com/benbjohnson/clock v1.3.0 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 // indirect
github.com/deepmap/oapi-codegen v1.8.2 // indirect
github.com/ethereum/go-ethereum v1.10.21 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/go-cmp v0.5.9 // indirect
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 // indirect
github.com/klauspost/compress v1.15.11 // indirect
github.com/kr/pretty v0.3.0 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
@ -56,9 +60,11 @@ require (
go.uber.org/goleak v1.1.12 // indirect
go.uber.org/multierr v1.8.0 // indirect
golang.org/x/crypto v0.1.0 // indirect
golang.org/x/net v0.1.0 // indirect
golang.org/x/sync v0.1.0 // indirect
golang.org/x/sys v0.2.0 // indirect
golang.org/x/text v0.4.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
// Needed for cosmos-sdk based chains. See

View File

@ -33,25 +33,43 @@ github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPx
github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU=
github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/cyberdelia/templates v0.0.0-20141128023046-ca7fffd4298c/go.mod h1:GyV+0YP4qX0UQ7r2MoYZ+AvYDp12OF5yg4q8rGnyNh4=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK09Y2A4Xv7EE0=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0 h1:HbphB4TFFXpv7MNrT52FGrrgVXF1owhMVTHFZIlnvd4=
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.1.0/go.mod h1:DZGJHZMqrU4JJqFAWUS2UO1+lbSKsdiOoYi9Zzey7Fc=
github.com/deepmap/oapi-codegen v1.6.0/go.mod h1:ryDa9AgbELGeB+YEXE1dR53yAjHwFvE9iAUlWl9Al3M=
github.com/deepmap/oapi-codegen v1.8.2 h1:SegyeYGcdi0jLLrpbCMoJxnUUn8GBXHsvr4rbzjuhfU=
github.com/deepmap/oapi-codegen v1.8.2/go.mod h1:YLgSKSDv/bZQB7N4ws6luhozi3cEdRktEqrX88CvjIw=
github.com/dgrijalva/jwt-go v3.2.0+incompatible/go.mod h1:E3ru+11k8xSBh+hMPgOLZmtrrCbhqsmaPHjLKYnJCaQ=
github.com/ethereum/go-ethereum v1.10.21 h1:5lqsEx92ZaZzRyOqBEXux4/UR06m296RGzN3ol3teJY=
github.com/ethereum/go-ethereum v1.10.21/go.mod h1:EYFyF19u3ezGLD4RqOkLq+ZCXzYbLoNDdZlMt7kyKFg=
github.com/getkin/kin-openapi v0.53.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/getkin/kin-openapi v0.61.0/go.mod h1:7Yn5whZr5kJi6t+kShccXS8ae1APpYTW6yheSwk8Yi4=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/go-chi/chi/v5 v5.0.0/go.mod h1:BBug9lr0cqtdAhsu6R4AAdvufI0/XBzAQSsUqJpoZOs=
github.com/go-openapi/jsonpointer v0.19.5/go.mod h1:Pl9vOtqEWErmShwVjC8pYs9cog34VGT37dQOVbmoatg=
github.com/go-openapi/swag v0.19.5/go.mod h1:POnQmlKehdgb5mhVOsnJFsivZCEZ/vjK9gh66Z9tfKk=
github.com/gofiber/fiber/v2 v2.40.1 h1:pc7n9VVpGIqNsvg9IPLQhyFEMJL8gCs1kneH5D1pIl4=
github.com/gofiber/fiber/v2 v2.40.1/go.mod h1:Gko04sLksnHbzLSRBFWPFdzM9Ws9pRxvvIaohJK1dsk=
github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/google/go-cmp v0.4.1/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE=
github.com/google/go-cmp v0.5.8/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38=
github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/influxdata/influxdb-client-go/v2 v2.4.0 h1:HGBfZYStlx3Kqvsv1h2pJixbCl/jhnFtxpKFAv9Tu5k=
github.com/influxdata/influxdb-client-go/v2 v2.4.0/go.mod h1:vLNHdxTJkIf2mSLvGrpj8TCcISApPoXkaxP8g9uRlW8=
github.com/influxdata/line-protocol v0.0.0-20200327222509-2487e7298839/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097 h1:vilfsDSy7TDxedi9gyBkMvAirat/oRcL0lFdJBf6tdM=
github.com/influxdata/line-protocol v0.0.0-20210311194329-9aa0e372d097/go.mod h1:xaLFMmpvUxqXtVkUJfg9QmT88cDaCJ3ZKgdZ78oO8Qo=
github.com/ipfs/go-log/v2 v2.5.1 h1:1XdUzF7048prq4aBjDQQ4SL5RxftpRGdXhNRwKSAlcY=
github.com/ipfs/go-log/v2 v2.5.1/go.mod h1:prSpmC1Gpllc9UYWxDiZDreBYw7zp4Iqp1kOLU9U5UI=
github.com/jmespath/go-jmespath v0.4.0/go.mod h1:T8mJZnbsbmF+m6zOOFylbeCJqk5+pHWvzYPziyZiYoo=
@ -69,13 +87,26 @@ github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/labstack/echo/v4 v4.2.1/go.mod h1:AA49e0DZ8kk5jTOOCKNuPR6oTnBS0dYiM4FW1e6jwpg=
github.com/labstack/gommon v0.3.0/go.mod h1:MULnywXg0yavhxWKc+lOruYdAhDwPK9wf0OL7NoOu+k=
github.com/mailru/easyjson v0.0.0-20190614124828-94de47d64c63/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/mailru/easyjson v0.0.0-20190626092158-b2ccc519800e/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/matryer/moq v0.0.0-20190312154309-6cfb0558e1bd/go.mod h1:9ELz6aaclSIGnZBoaSLZ3NAl1VTufbOrXBPvtcy6WiQ=
github.com/mattn/go-colorable v0.1.2/go.mod h1:U0ppj6V5qS13XJ6of8GYAs25YV2eR4EVcfRqFIhoBtE=
github.com/mattn/go-colorable v0.1.7/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.8/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.9/go.mod h1:YNRxwqDuOph6SZLI9vUUz6OYw3QyUt7WiY2yME+cCiQ=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16 h1:bq3VjFmv/sOjHtdEhmkEV4x1AJtvUvOJ2PFAZ5+peKQ=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-runewidth v0.0.14 h1:+xnbZSEeDbOIg5/mE6JF0w6n9duR1l3/WmbinWVwUuU=
github.com/mattn/go-runewidth v0.0.14/go.mod h1:Jdepj2loyihRzMpdS35Xk/zdY8IAYHsh153qUoGf23w=
github.com/mitchellh/mapstructure v1.5.0 h1:jeMsZIYE/09sWLaz43PL7Gy6RuMjD2eJVyuac5Z2hdY=
github.com/mitchellh/mapstructure v1.5.0/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RRV2QTWOzhPopBRo=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0=
github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc=
github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
@ -93,6 +124,8 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+
github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw=
github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
@ -106,6 +139,8 @@ github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6Kllzaw
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasthttp v1.41.0 h1:zeR0Z1my1wDHTRiamBCXVglQdbUwgb9uWG3k1HQz6jY=
github.com/valyala/fasthttp v1.41.0/go.mod h1:f6VbjjoI3z1NDOZOv17o6RvtRSWxC77seBFc2uWtgiY=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ=
github.com/valyala/tcplisten v1.0.0 h1:rBHj/Xf+E1tRGZyWIWwJDiRY0zc1Js+CV5DqwacVSA8=
github.com/valyala/tcplisten v1.0.0/go.mod h1:T0xQ8SeCZGxckz9qRXTfG43PvQ/mcWh7FwZEA7Ioqkc=
github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230123141139-45b3d18d80b2 h1:we8iat9DdKt8V6aopxFe+2PkKol9cDPYwH8xZue0R60=
@ -135,6 +170,9 @@ go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY=
go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200820211705-5c72a883971a/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20201221181555-eec23a3978ad/go.mod h1:jdWPYTVW3xRLrWPugEBEK3UY2ZEsg3UU495nc5E+M+I=
golang.org/x/crypto v0.0.0-20220214200702-86341886e292/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/crypto v0.1.0 h1:MDRAIl0xIo9Io2xV565hzXHw3zVseKrJKodhohM5CjU=
@ -144,16 +182,28 @@ golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200822124328-c89045814202/go.mod h1:/O7V0waA8r7cgGh81Ro3o1hOxt32SMVPicZroKQ2sZA=
golang.org/x/net v0.0.0-20210119194325-5f4716e94777/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220906165146-f3363e06e74c/go.mod h1:YDH+HFinaLZZlnHAfSS6ZXJJ9M9t4Dl22yv3iI2vPwk=
golang.org/x/net v0.1.0 h1:hZ/3BUoy5aId7sCpA/Tc5lt8DkFgdVS2onTpJsZ/fl0=
golang.org/x/net v0.1.0/go.mod h1:Cx3nUiGt4eDBEyega/BKRp+/AlGL8hYe7U9odMt2Cco=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190222072716-a9d3bda3a223/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20190813064441-fde4db37ae7a/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200223170610-d5e6a3e2c0ae/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200826173525-f9321e4c35a6/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
@ -163,17 +213,22 @@ golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.2.0 h1:ljd4t30dBnAvMZaQCevtY0xLLD0A+bRZXbgLMLU1F/A=
golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.5/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ=
golang.org/x/text v0.4.0 h1:BrVqGRd7+k1DiOgtnFvAkoQEWQvBc25ouMJM6429SFg=
golang.org/x/text v0.4.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210220033141-f8bda1e9f3ba/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20190311212946-11955173bddd/go.mod h1:LCzVGOaR6xXOjkQ3onu1FJEFr0SW1gC7cKk1uF8kGRs=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -181,8 +236,13 @@ golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8T
golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.4.0 h1:D8xgwECY7CYvx+Y2n4sBz93Jn9JRvxdiyyo8CTfuKaY=
gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=

50
parser/metrics/metrics.go Normal file
View File

@ -0,0 +1,50 @@
package metrics
import (
"context"
"fmt"
"time"
influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
"go.uber.org/zap"
)
const measurement = "vaa_volume"
// Metric definition.
type Metrics struct {
influxCli influxdb2.Client
writeApi api.WriteAPIBlocking
logger *zap.Logger
}
type Volume struct {
ChainSourceID uint16
ChainDestinationID uint16
Value uint64
Timestamp time.Time
AppID string
}
// New create a new *Metric
func New(influxCli influxdb2.Client, organization, bucket string, logger *zap.Logger) *Metrics {
writeAPI := influxCli.WriteAPIBlocking(organization, bucket)
return &Metrics{influxCli: influxCli, writeApi: writeAPI, logger: logger}
}
func (m *Metrics) PushVolume(ctx context.Context, v *Volume) error {
point := influxdb2.NewPointWithMeasurement(measurement).
AddTag("chain_source_id", fmt.Sprintf("%d", v.ChainSourceID)).
AddTag("chain_destination_id", fmt.Sprintf("%d", v.ChainDestinationID)).
AddField("volume", v.Value).
AddField("app_id", v.AppID).
SetTime(v.Timestamp)
// write point to influx
err := m.writeApi.WritePoint(ctx, point)
if err != nil {
return err
}
return nil
}

View File

@ -13,4 +13,5 @@ type ParsedVaaUpdate struct {
AppID string `bson:"appId"`
Result interface{} `bson:"result"`
UpdatedAt *time.Time `bson:"updatedAt"`
Timestamp time.Time `bson:"-"`
}

View File

@ -0,0 +1,83 @@
package processor
import (
"context"
"github.com/mitchellh/mapstructure"
"github.com/wormhole-foundation/wormhole-explorer/parser/metrics"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
"go.uber.org/zap"
)
const (
portalTokenBridgeAppID = "PORTAL_TOKEN_BRIDGE"
transferPayloadType = 1
attestMetaPayloadType = 2
transferWithPayloadPayloadType = 3
)
type portalTokenBridgePayload struct {
PayloadType int `mapstructure:"payloadType"`
Amount *uint64 `mapstructure:"amount"`
ToChainID *uint16 `mapstructure:"toChain"`
}
type Processor struct {
repository *parser.Repository
metrics *metrics.Metrics
logger *zap.Logger
}
func New(repository *parser.Repository, metrics *metrics.Metrics, logger *zap.Logger) *Processor {
return &Processor{
repository: repository,
metrics: metrics,
logger: logger,
}
}
func (p *Processor) Process(ctx context.Context, vaaParsed *parser.ParsedVaaUpdate) error {
err := p.repository.UpsertParsedVaa(ctx, *vaaParsed)
if err != nil {
p.logger.Error("Error inserting vaa in repository",
zap.String("id", vaaParsed.ID),
zap.Error(err))
return err
}
p.logger.Info("Vaa save in repository", zap.String("id", vaaParsed.ID))
if vaaParsed.AppID == portalTokenBridgeAppID {
input, ok := vaaParsed.Result.(map[string]interface{})
if ok {
var result portalTokenBridgePayload
err := mapstructure.Decode(input, &result)
if err != nil {
p.logger.Warn("Decoding map to payload struct", zap.String("id", vaaParsed.ID), zap.Error(err))
return nil
}
if result.PayloadType == transferPayloadType || result.PayloadType == transferWithPayloadPayloadType {
if result.Amount == nil || result.ToChainID == nil {
p.logger.Warn("amount or toChain are empty", zap.String("id", vaaParsed.ID), zap.Any("payload", input))
return nil
}
metric := &metrics.Volume{
ChainSourceID: vaaParsed.EmitterChain,
ChainDestinationID: *result.ToChainID,
Value: *result.Amount,
Timestamp: vaaParsed.Timestamp,
AppID: vaaParsed.AppID,
}
err := p.metrics.PushVolume(ctx, metric)
if err != nil {
return err
}
}
} else {
p.logger.Warn("Casting parsed vaa to map", zap.String("id", vaaParsed.ID))
}
}
return nil
}

10
parser/processor/types.go Normal file
View File

@ -0,0 +1,10 @@
package processor
import (
"context"
"github.com/wormhole-foundation/wormhole-explorer/parser/parser"
)
// ProcessorFunc is a function to process ParsedVaaUpdate
type ProcessorFunc func(context.Context, *parser.ParsedVaaUpdate) error

View File

@ -3,10 +3,11 @@ FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a
WORKDIR /app
COPY . .
COPY pipeline pipeline
COPY common common
# Build the Go app
RUN make build
RUN cd pipeline && make build
############################
# STEP 2 build a small image
@ -15,6 +16,6 @@ FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/pipeline-app" "/pipeline"
COPY --from=build "/app/pipeline/pipeline-app" "/pipeline"
# Run the binary.
ENTRYPOINT ["/pipeline"]

View File

@ -3,10 +3,11 @@ FROM --platform=linux/amd64 docker.io/golang:1.19.2@sha256:0467d7d12d170ed8d998a
WORKDIR /app
COPY . .
COPY spy spy
COPY common common
# Build the Go app
RUN CGO_ENABLED=0 GOOS=linux go build -o "./spy" cmd/main.go
RUN cd spy && CGO_ENABLED=0 GOOS=linux go build -o "./spy" cmd/main.go
############################
# STEP 2 build a small image
@ -15,6 +16,6 @@ FROM alpine
#Copy certificates
COPY --from=build /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/
# Copy our static executable.
COPY --from=build "/app/spy" "/spy"
COPY --from=build "/app/spy/spy" "/spy"
# Run the binary.
ENTRYPOINT ["/spy"]