From 28fc5198b00ab82e441e53b270b7b675042ee37d Mon Sep 17 00:00:00 2001 From: ftocal <46001274+ftocal@users.noreply.github.com> Date: Fri, 20 Oct 2023 18:30:34 -0300 Subject: [PATCH] New job for transfer report (#756) Co-authored-by: walker-16 --- common/prices/prices.go | 75 +++++++++ common/prices/volume.go | 24 +++ jobs/cmd/main.go | 31 +++- jobs/config/config.go | 41 ++++- jobs/go.mod | 15 +- jobs/go.sum | 40 ++++- jobs/jobs/jobs.go | 3 +- jobs/jobs/report/transfer_report.go | 231 ++++++++++++++++++++++++++++ 8 files changed, 450 insertions(+), 10 deletions(-) create mode 100644 common/prices/prices.go create mode 100644 common/prices/volume.go create mode 100644 jobs/jobs/report/transfer_report.go diff --git a/common/prices/prices.go b/common/prices/prices.go new file mode 100644 index 00000000..70c634a9 --- /dev/null +++ b/common/prices/prices.go @@ -0,0 +1,75 @@ +package prices + +import ( + "bufio" + "fmt" + "os" + "strings" + "time" + + "github.com/shopspring/decimal" +) + +type CoinPricesCache struct { + filename string + Prices map[string]decimal.Decimal +} + +func NewCoinPricesCache(priceFile string) *CoinPricesCache { + return &CoinPricesCache{ + filename: priceFile, + Prices: make(map[string]decimal.Decimal), + } +} + +func (c *CoinPricesCache) GetPriceByTime(coingeckoID string, day time.Time) (decimal.Decimal, error) { + + // remove hours and minutes, + // times are in UTC + day = time.Date(day.Year(), day.Month(), day.Day(), 0, 0, 0, 0, time.UTC) + + // look up the price + key := fmt.Sprintf("%s%d", coingeckoID, day.UnixMilli()) + if price, ok := c.Prices[key]; ok { + return price, nil + } + + return decimal.NewFromInt(0), fmt.Errorf("price not found for %s", key) +} + +// load the csv file with prices into a map +func (cpc *CoinPricesCache) InitCache() { + + // open prices file + file := cpc.filename + f, err := os.Open(file) + if err != nil { + panic(err) + } + defer f.Close() + + // read line by line + scanner := bufio.NewScanner(f) + for scanner.Scan() { + + // split line by comma + row := scanner.Text() + tokens := strings.Split(row, ",") + if len(tokens) != 5 { + panic(fmt.Errorf("invalid line: %s", row)) + } + + // build map key: coingecko_id+timestamp + key := fmt.Sprintf("%s%s", tokens[1], tokens[3]) + + // parse price + price, err := decimal.NewFromString(tokens[4]) + if err != nil { + msg := fmt.Sprintf("failed to parse price err=%v line=%s", err, row) + panic(msg) + } + + cpc.Prices[key] = price + } + +} diff --git a/common/prices/volume.go b/common/prices/volume.go new file mode 100644 index 00000000..8af155e7 --- /dev/null +++ b/common/prices/volume.go @@ -0,0 +1,24 @@ +package prices + +import ( + "math/big" + + "github.com/shopspring/decimal" +) + +// CalculatePriceUSD calculates the price in USD for a given notional value and amount of tokens +func CalculatePriceUSD(notionalUSD decimal.Decimal, amount *big.Int, decimals int64) decimal.Decimal { + + var exp int32 + if decimals > 8 { + exp = 8 + } else { + exp = int32(decimals) + } + tokenAmount := decimal.NewFromBigInt(amount, -exp) + + // Compute the amount in USD + usdAmount := tokenAmount.Mul(notionalUSD) + + return usdAmount +} diff --git a/jobs/cmd/main.go b/jobs/cmd/main.go index 415d124c..7c9f7e11 100644 --- a/jobs/cmd/main.go +++ b/jobs/cmd/main.go @@ -6,11 +6,14 @@ import ( "os" "github.com/go-redis/redis" + "github.com/wormhole-foundation/wormhole-explorer/common/dbutil" "github.com/wormhole-foundation/wormhole-explorer/common/logger" + "github.com/wormhole-foundation/wormhole-explorer/common/prices" "github.com/wormhole-foundation/wormhole-explorer/jobs/config" "github.com/wormhole-foundation/wormhole-explorer/jobs/internal/coingecko" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs" "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/notional" + "github.com/wormhole-foundation/wormhole-explorer/jobs/jobs/report" "go.uber.org/zap" ) @@ -32,8 +35,20 @@ func main() { var err error switch cfg.JobID { case jobs.JobIDNotional: - notionalJob := initNotionalJob(context, cfg, logger) + nCfg, errCfg := config.NewNotionalConfiguration(context) + if errCfg != nil { + log.Fatal("error creating config", errCfg) + } + notionalJob := initNotionalJob(context, nCfg, logger) err = notionalJob.Run() + case jobs.JobIDTransferReport: + aCfg, errCfg := config.NewTransferReportConfiguration(context) + if errCfg != nil { + log.Fatal("error creating config", errCfg) + } + transferReport := initTransferReportJob(context, aCfg, logger) + err = transferReport.Run(context) + default: logger.Fatal("Invalid job id", zap.String("job_id", cfg.JobID)) } @@ -47,7 +62,7 @@ func main() { } // initNotionalJob initializes notional job. -func initNotionalJob(ctx context.Context, cfg *config.Configuration, logger *zap.Logger) *notional.NotionalJob { +func initNotionalJob(ctx context.Context, cfg *config.NotionalConfiguration, logger *zap.Logger) *notional.NotionalJob { // init coingecko api client. api := coingecko.NewCoingeckoAPI(cfg.CoingeckoURL) // init redis client. @@ -57,6 +72,18 @@ func initNotionalJob(ctx context.Context, cfg *config.Configuration, logger *zap return notionalJob } +// initTransferReportJob initializes transfer report job. +func initTransferReportJob(ctx context.Context, cfg *config.TransferReportConfiguration, logger *zap.Logger) *report.TransferReportJob { + //setup DB connection + db, err := dbutil.Connect(ctx, logger, cfg.MongoURI, cfg.MongoDatabase, false) + if err != nil { + logger.Fatal("Failed to connect MongoDB", zap.Error(err)) + } + pricesCache := prices.NewCoinPricesCache(cfg.PricesPath) + pricesCache.InitCache() + return report.NewTransferReportJob(db.Database, cfg.PageSize, pricesCache, cfg.OutputPath, logger) +} + func handleExit() { if r := recover(); r != nil { if e, ok := r.(exitCode); ok { diff --git a/jobs/config/config.go b/jobs/config/config.go index 974feeb9..97bac578 100644 --- a/jobs/config/config.go +++ b/jobs/config/config.go @@ -11,9 +11,12 @@ import ( // Configuration is the configuration for the job type Configuration struct { + JobID string `env:"JOB_ID,required"` + LogLevel string `env:"LOG_LEVEL,default=INFO"` +} + +type NotionalConfiguration struct { Environment string `env:"ENVIRONMENT,required"` - LogLevel string `env:"LOG_LEVEL,default=INFO"` - JobID string `env:"JOB_ID,required"` CoingeckoURL string `env:"COINGECKO_URL,required"` CacheURL string `env:"CACHE_URL,required"` CachePrefix string `env:"CACHE_PREFIX,required"` @@ -21,7 +24,15 @@ type Configuration struct { P2pNetwork string `env:"P2P_NETWORK,required"` } -// New creates a configuration with the values from .env file and environment variables. +type TransferReportConfiguration struct { + MongoURI string `env:"MONGODB_URI,required"` + MongoDatabase string `env:"MONGODB_DATABASE,required"` + PageSize int64 `env:"PAGE_SIZE,default=100"` + PricesPath string `env:"PRICES_PATH,required"` + OutputPath string `env:"OUTPUT_PATH,required"` +} + +// New creates a default configuration with the values from .env file and environment variables. func New(ctx context.Context) (*Configuration, error) { _ = godotenv.Load(".env", "../.env") @@ -32,3 +43,27 @@ func New(ctx context.Context) (*Configuration, error) { return &configuration, nil } + +// New creates a notional configuration with the values from .env file and environment variables. +func NewNotionalConfiguration(ctx context.Context) (*NotionalConfiguration, error) { + _ = godotenv.Load(".env", "../.env") + + var configuration NotionalConfiguration + if err := envconfig.Process(ctx, &configuration); err != nil { + return nil, err + } + + return &configuration, nil +} + +// New creates a transfer report configuration with the values from .env file and environment variables. +func NewTransferReportConfiguration(ctx context.Context) (*TransferReportConfiguration, error) { + _ = godotenv.Load(".env", "../.env") + + var configuration TransferReportConfiguration + if err := envconfig.Process(ctx, &configuration); err != nil { + return nil, err + } + + return &configuration, nil +} diff --git a/jobs/go.mod b/jobs/go.mod index 29251198..dee806e4 100644 --- a/jobs/go.mod +++ b/jobs/go.mod @@ -8,6 +8,8 @@ require ( github.com/sethvargo/go-envconfig v0.9.0 github.com/shopspring/decimal v1.3.1 github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230713181709-0425a89e7533 + github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 + go.mongodb.org/mongo-driver v1.11.2 go.uber.org/zap v1.24.0 ) @@ -21,15 +23,24 @@ require ( github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/ethereum/go-ethereum v1.10.21 // indirect github.com/go-redis/redis/v8 v8.11.5 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/go-cmp v0.5.9 // indirect github.com/holiman/uint256 v1.2.1 // indirect + github.com/klauspost/compress v1.16.3 // indirect + github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/onsi/gomega v1.27.6 // indirect - github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 // indirect + github.com/pkg/errors v0.9.1 // indirect + github.com/xdg-go/pbkdf2 v1.0.0 // indirect + github.com/xdg-go/scram v1.1.1 // indirect + github.com/xdg-go/stringprep v1.0.3 // indirect + github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d // indirect go.uber.org/atomic v1.7.0 // indirect go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.7.0 // indirect + golang.org/x/sync v0.1.0 // indirect golang.org/x/sys v0.9.0 // indirect + golang.org/x/text v0.8.0 // indirect ) -replace github.com/wormhole-foundation/wormhole-explorer/common => ../common \ No newline at end of file +replace github.com/wormhole-foundation/wormhole-explorer/common => ../common diff --git a/jobs/go.sum b/jobs/go.sum index 5825cb16..8265e264 100644 --- a/jobs/go.sum +++ b/jobs/go.sum @@ -27,12 +27,26 @@ github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGK github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= github.com/go-redis/redis/v8 v8.11.5 h1:AcZZR7igkdvfVmQTPnu9WE37LRrO/YrBH5zWyjDC0oI= github.com/go-redis/redis/v8 v8.11.5/go.mod h1:gREzHqY1hg6oD9ngVRbLStwAWKhA0FEgq8Jd4h5lpwo= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/google/go-cmp v0.5.2/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/holiman/uint256 v1.2.1 h1:XRtyuda/zw2l+Bq/38n5XUoEF72aSOu/77Thd9pPp2o= github.com/holiman/uint256 v1.2.1/go.mod h1:y4ga/t+u+Xwd7CpDgZESaRcWy0I7XMlTMA25ApIH5Jw= github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= +github.com/klauspost/compress v1.13.6/go.mod h1:/3/Vjq9QcHkK5uEr5lBEmyoZ1iFhe47etQ6QUkpK6sk= +github.com/klauspost/compress v1.16.3 h1:XuJt9zzcnaz6a16/OU53ZjWp/v7/42WcR5t2a0PcNQY= +github.com/klauspost/compress v1.16.3/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/kr/pretty v0.1.0 h1:L/CwN0zerZDmRFUapSPitk6f+Q3+0za1rQkzVuMiMFI= +github.com/kr/pretty v0.1.0/go.mod h1:dAy3ld7l9f0ibDNOQOHHMYYIIbhfbHSm3C4ZsoJORNo= +github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ= +github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe h1:iruDEfMl2E6fbMZ9s0scYfZQ84/6SPL6zC8ACM2oIL0= +github.com/montanaflynn/stats v0.0.0-20171201202039-1bf9dbcd8cbe/go.mod h1:wL8QJuTMNUDYhXwkmfOly8iTdp5TEcJFWZD2D7SIkUc= github.com/mr-tron/base58 v1.2.0 h1:T/HDJBh4ZCPbU39/+c3rRvE0uKBQlU27+QI8LJ4t64o= github.com/mr-tron/base58 v1.2.0/go.mod h1:BinMc/sQntlIE1frQmRFPUoPA1Zkr8VRgBdjWI2mNwc= github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE= @@ -40,6 +54,7 @@ github.com/onsi/ginkgo v1.16.5 h1:8xi0RTUf59SOSfEtZMvwTvXYMzG4gV23XVHOZiXNtnE= github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/sethvargo/go-envconfig v0.9.0 h1:Q6FQ6hVEeTECULvkJZakq3dZMeBQ3JUpcKMfPQbKMDE= @@ -48,13 +63,24 @@ github.com/shopspring/decimal v1.3.1 h1:2Usl1nmF/WZucqkFZhnfFYxxxu8LG21F6nPQBE5g github.com/shopspring/decimal v1.3.1/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= +github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= github.com/test-go/testify v1.1.4 h1:Tf9lntrKUMHiXQ07qBScBTSA0dhYQlu83hswqelv1iE= -github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230713181709-0425a89e7533 h1:UpHS7v46L1WIcVyJAUoEHkyL7TV5rODsja0yoaakKVg= -github.com/wormhole-foundation/wormhole-explorer/common v0.0.0-20230713181709-0425a89e7533/go.mod h1:18WiwmzCqiQ2V1TlAYyMjkrW+qD3vKfmctqGWbGAbC0= +github.com/tidwall/pretty v1.0.0 h1:HsD+QiTn7sK6flMKIvNmpqz1qrpP3Ps6jOKIKMooyg4= +github.com/tidwall/pretty v1.0.0/go.mod h1:XNkn88O1ChpSDQmQeStsy+sBenx6DDtFZJxhVysOjyk= github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 h1:rrOyHd+H9a6Op1iUyZNCaI5v9D1syq8jDAYyX/2Q4L8= github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8/go.mod h1:dE12DOucCq23gjGGGhtbyx41FBxuHxjpPvG+ArO+8t0= +github.com/xdg-go/pbkdf2 v1.0.0 h1:Su7DPu48wXMwC3bs7MCNG+z4FhcyEuz5dlvchbq0B0c= +github.com/xdg-go/pbkdf2 v1.0.0/go.mod h1:jrpuAogTd400dnrH08LKmI/xc1MbPOebTwRqcT5RDeI= +github.com/xdg-go/scram v1.1.1 h1:VOMT+81stJgXW3CpHyqHN3AXDYIMsx56mEFrB37Mb/E= +github.com/xdg-go/scram v1.1.1/go.mod h1:RaEWvsqvNKKvBPvcKeFjrG2cJqOkHTiyTpzz23ni57g= +github.com/xdg-go/stringprep v1.0.3 h1:kdwGpVNwPFtjs98xCGkHjQtGKh86rDcRZN17QEMCOIs= +github.com/xdg-go/stringprep v1.0.3/go.mod h1:W3f5j4i+9rC0kuIEJL0ky1VpHXQU3ocBgklLGvcBnW8= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d h1:splanxYIlg+5LfHAM6xpdFEAYOk8iySO56hMFq6uLyA= +github.com/youmark/pkcs8 v0.0.0-20181117223130-1be2e3e5546d/go.mod h1:rHwXgn7JulP+udvsHwJoVG1YGAP6VLg4y9I5dyZdqmA= github.com/yuin/goldmark v1.4.1/go.mod h1:mwnBkeHKe2W/ZEtQ+71ViKU8L12m81fl3OWwC1Zlc8k= +go.mongodb.org/mongo-driver v1.11.2 h1:+1v2rDQUWNcGW7/7E0Jvdz51V38XXxJfhzbV17aNHCw= +go.mongodb.org/mongo-driver v1.11.2/go.mod h1:s7p5vEtfbeR1gYi6pnj3c3/urpbLv2T5Sfd6Rp2HBB8= go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= @@ -64,15 +90,19 @@ go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4= golang.org/x/crypto v0.7.0 h1:AvwMYaRytfdeVt3u6mLaxYtErKYjxA2OXjJ1HHq6t3A= golang.org/x/crypto v0.7.0/go.mod h1:pYwdfH91IfpZVANVyUOhSIPZaFoJGxTFbZhFTx+dXZU= golang.org/x/mod v0.6.0-dev.0.20220106191415-9b9b3d81d5e3/go.mod h1:3p9vT2HGsQu2K1YbXdKPJLVgG5VJdoTa1poYQBtP1AY= golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v0D8zg8gWTRqZa9RBIspLL5mdg= golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= +golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y= golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -86,11 +116,17 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.6/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.10/go.mod h1:Uh6Zz+xoGYZom868N8YTex3t7RhtHDBrE8Gzo9bV56E= golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= golang.org/x/xerrors v0.0.0-20200804184101-5ec99f83aff1/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= +gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/jobs/jobs/jobs.go b/jobs/jobs/jobs.go index 0283833b..d130f890 100644 --- a/jobs/jobs/jobs.go +++ b/jobs/jobs/jobs.go @@ -3,7 +3,8 @@ package jobs // JobIDNotional is the job id for notional job. const ( - JobIDNotional = "JOB_NOTIONAL_USD" + JobIDNotional = "JOB_NOTIONAL_USD" + JobIDTransferReport = "JOB_TRANSFER_REPORT" ) // Job is the interface for jobs. diff --git a/jobs/jobs/report/transfer_report.go b/jobs/jobs/report/transfer_report.go new file mode 100644 index 00000000..e816c1d2 --- /dev/null +++ b/jobs/jobs/report/transfer_report.go @@ -0,0 +1,231 @@ +package report + +import ( + "context" + "fmt" + "math/big" + "os" + "time" + + "github.com/shopspring/decimal" + "github.com/wormhole-foundation/wormhole-explorer/common/domain" + "github.com/wormhole-foundation/wormhole-explorer/common/prices" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.uber.org/zap" +) + +type TransferReportJob struct { + database *mongo.Database + pageSize int64 + logger *zap.Logger + pricesCache *prices.CoinPricesCache + outputPath string +} + +type transactionResult struct { + ID string `bson:"_id"` + SourceChain int `bson:"sourceChain"` + EmitterAddress string `bson:"emitterAddress"` + Sequence string `bson:"sequence"` + DestinationChain int `bson:"destinationChain"` + TokenChain int `bson:"tokenChain"` + TokenAddress string `bson:"tokenAddress"` + TokenAddressHexa string `bson:"tokenAddressHexa"` + Amount string `bson:"amount"` + SourceWallet string `bson:"sourceWallet"` + DestinationWallet string `bson:"destinationWallet"` + Fee string `bson:"fee"` + Timestamp time.Time `bson:"timestamp"` + AppIds []string `bson:"appIds"` +} + +// NewTransferReportJob creates a new transfer report job. +func NewTransferReportJob(database *mongo.Database, pageSize int64, pricesCache *prices.CoinPricesCache, outputPath string, logger *zap.Logger) *TransferReportJob { + return &TransferReportJob{database: database, pageSize: pageSize, pricesCache: pricesCache, outputPath: outputPath, logger: logger} +} + +// Run runs the transfer report job. +func (j *TransferReportJob) Run(ctx context.Context) error { + + file, err := os.Create(j.outputPath) + if err != nil { + return err + } + + defer file.Close() + + //start backfilling + page := int64(0) + for { + j.logger.Info("Processing page", zap.Int64("page", page)) + + trxs, err := j.findTransactionsByPage(ctx, page, j.pageSize) + if err != nil { + j.logger.Error("Failed to get transactions", zap.Error(err)) + break + } + + if len(trxs) == 0 { + j.logger.Info("Empty page", zap.Int64("page", page)) + break + } + for _, t := range trxs { + j.logger.Debug("Processing transaction", zap.String("id", t.ID)) + + if t.TokenAddressHexa == "" { + j.writeRecord(t, t.Amount, nil, nil, file) + continue + } + + tokenAddress, err := sdk.StringToAddress(t.TokenAddressHexa) + if err != nil { + j.logger.Error("Failed to get transactions", + zap.String("id", t.ID), + zap.String("TokenAddressHexa", t.TokenAddressHexa), + zap.Error(err)) + continue + } + + m, ok := domain.GetTokenByAddress(sdk.ChainID(t.TokenChain), tokenAddress.String()) + if ok { + tokenPrice, err := j.pricesCache.GetPriceByTime(m.CoingeckoID, t.Timestamp) + if err != nil { + continue + } + if t.Amount == "" { + j.writeRecord(t, t.Amount, nil, nil, file) + continue + } + amount := new(big.Int) + amount, ok := amount.SetString(t.Amount, 10) + if !ok { + j.logger.Error("amount is not a number", + zap.String("id", t.ID), + zap.String("amount", t.Amount), + ) + j.writeRecord(t, "", nil, nil, file) + continue + } + + priceUSD := prices.CalculatePriceUSD(tokenPrice, amount, m.Decimals) + + j.writeRecord(t, t.Amount, m, &priceUSD, file) + } else { + j.writeRecord(t, t.Amount, nil, nil, file) + } + + } + page++ + } + return nil +} + +func (*TransferReportJob) writeRecord(trx transactionResult, fAmount string, m *domain.TokenMetadata, priceUSD *decimal.Decimal, file *os.File) error { + // vaaId, sourceChain,emitterAddress,sequence,sourceWallet, destinationChain,destinationWallet,tokenChain,tokenAddress,amount,decimals,notionalUSD,fee,coinGeckoId,symbol + var notionalUSD, decimals, symbol, coingeckoID string + if m != nil { + decimals = fmt.Sprintf("%d", m.Decimals) + symbol = m.Symbol.String() + coingeckoID = m.CoingeckoID + } + if priceUSD != nil { + notionalUSD = priceUSD.Truncate(10).String() + } + line := fmt.Sprintf("%s,%d,%s,%s,%s,%d,%s,%d,%s,%s,%s,%s,%s,%s,%s\n", + trx.ID, trx.SourceChain, trx.EmitterAddress, trx.Sequence, trx.SourceWallet, trx.DestinationChain, + trx.DestinationWallet, trx.TokenChain, trx.TokenAddress, fAmount, decimals, + notionalUSD, trx.Fee, coingeckoID, symbol) + _, err := file.WriteString(line) + return err +} + +func (j *TransferReportJob) findTransactionsByPage(ctx context.Context, page, pageSize int64) ([]transactionResult, error) { + + vaas := j.database.Collection("vaas") + + skip := page * pageSize + + // Build the aggregation pipeline + var pipeline mongo.Pipeline + + pipeline = append(pipeline, bson.D{ + {Key: "$sort", Value: bson.D{ + bson.E{Key: "timestamp", Value: -1}, + bson.E{Key: "_id", Value: -1}, + }}, + }) + + pipeline = append(pipeline, bson.D{ + {Key: "$lookup", Value: bson.D{ + {Key: "from", Value: "parsedVaa"}, + {Key: "localField", Value: "_id"}, + {Key: "foreignField", Value: "_id"}, + {Key: "as", Value: "parsedVaa"}, + }}, + }) + + pipeline = append(pipeline, bson.D{ + {Key: "$lookup", Value: bson.D{ + {Key: "from", Value: "globalTransactions"}, + {Key: "localField", Value: "_id"}, + {Key: "foreignField", Value: "_id"}, + {Key: "as", Value: "globalTransactions"}, + }}, + }) + + // Skip initial results + pipeline = append(pipeline, bson.D{ + {Key: "$skip", Value: skip}, + }) + + // Limit size of results + pipeline = append(pipeline, bson.D{ + {Key: "$limit", Value: pageSize}, + }) + + // add nested fields + pipeline = append(pipeline, bson.D{ + {"$addFields", bson.D{ + {"standardizedProperties", bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.rawStandardizedProperties", 0}}}, + {"globalTransactions", bson.M{"$arrayElemAt": []interface{}{"$globalTransactions", 0}}}, + {"appIds", bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.appIds", 0}}}, + {"parsedPayload", bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.parsedPayload", 0}}}, + }}, + }) + + pipeline = append(pipeline, bson.D{ + {Key: "$project", Value: bson.D{ + {Key: "appIds", Value: "$appIds"}, + {Key: "sourceChain", Value: "$emitterChain"}, + {Key: "emitterAddress", Value: "$emitterAddr"}, + {Key: "sequence", Value: "$sequence"}, + {Key: "destinationChain", Value: "$standardizedProperties.toChain"}, + {Key: "tokenChain", Value: "$standardizedProperties.tokenChain"}, + {Key: "tokenAddress", Value: "$standardizedProperties.tokenAddress"}, + {Key: "amount", Value: "$standardizedProperties.amount"}, + {Key: "sourceWallet", Value: "$globalTransactions.originTx.from"}, + {Key: "destinationWallet", Value: "$standardizedProperties.toAddress"}, + {Key: "fee", Value: "$standardizedProperties.fee"}, + {Key: "timestamp", Value: "$timestamp"}, + {Key: "tokenAddressHexa", Value: "$parsedPayload.tokenAddress"}, + }}}) + + // Execute the aggregation pipeline + cur, err := vaas.Aggregate(ctx, pipeline) + if err != nil { + j.logger.Error("failed execute aggregation pipeline", zap.Error(err)) + return nil, err + } + + // Read results from cursor + var documents []transactionResult + err = cur.All(ctx, &documents) + if err != nil { + j.logger.Error("failed to decode cursor", zap.Error(err)) + return nil, err + } + + return documents, nil +}