From d45addcd592a940107423c65e517992128812d0c Mon Sep 17 00:00:00 2001 From: ftocal <46001274+ftocal@users.noreply.github.com> Date: Tue, 24 Oct 2023 17:11:37 -0300 Subject: [PATCH] Fix csv transfer report (#764) --- jobs/go.mod | 2 +- jobs/go.sum | 4 +- jobs/jobs/report/transfer_report.go | 117 ++++++++++++++++++++-------- 3 files changed, 87 insertions(+), 36 deletions(-) diff --git a/jobs/go.mod b/jobs/go.mod index dee806e4..a0b2ee91 100644 --- a/jobs/go.mod +++ b/jobs/go.mod @@ -39,7 +39,7 @@ require ( go.uber.org/multierr v1.6.0 // indirect golang.org/x/crypto v0.7.0 // indirect golang.org/x/sync v0.1.0 // indirect - golang.org/x/sys v0.9.0 // indirect + golang.org/x/sys v0.13.0 // indirect golang.org/x/text v0.8.0 // indirect ) diff --git a/jobs/go.sum b/jobs/go.sum index 8265e264..e9fd1fa3 100644 --- a/jobs/go.sum +++ b/jobs/go.sum @@ -108,8 +108,8 @@ golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20211019181941-9d821ace8654/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= -golang.org/x/sys v0.9.0 h1:KS/R3tvhPqvJvwcKfnBHJwwthS11LRhmM5D59eEXa0s= -golang.org/x/sys v0.9.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= +golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= diff --git a/jobs/jobs/report/transfer_report.go b/jobs/jobs/report/transfer_report.go index e816c1d2..f67c2b87 100644 --- a/jobs/jobs/report/transfer_report.go +++ b/jobs/jobs/report/transfer_report.go @@ -2,9 +2,11 @@ package report import ( "context" + "encoding/csv" "fmt" "math/big" "os" + "regexp" "time" "github.com/shopspring/decimal" @@ -25,20 +27,20 @@ type TransferReportJob struct { } type transactionResult struct { - ID string `bson:"_id"` - SourceChain int `bson:"sourceChain"` - EmitterAddress string `bson:"emitterAddress"` - Sequence string `bson:"sequence"` - DestinationChain int `bson:"destinationChain"` - TokenChain int `bson:"tokenChain"` - TokenAddress string `bson:"tokenAddress"` - TokenAddressHexa string `bson:"tokenAddressHexa"` - Amount string `bson:"amount"` - SourceWallet string `bson:"sourceWallet"` - DestinationWallet string `bson:"destinationWallet"` - Fee string `bson:"fee"` - Timestamp time.Time `bson:"timestamp"` - AppIds []string `bson:"appIds"` + ID string `bson:"_id"` + SourceChain sdk.ChainID `bson:"sourceChain"` + EmitterAddress string `bson:"emitterAddress"` + Sequence string `bson:"sequence"` + DestinationChain sdk.ChainID `bson:"destinationChain"` + TokenChain sdk.ChainID `bson:"tokenChain"` + TokenAddress string `bson:"tokenAddress"` + TokenAddressHexa string `bson:"tokenAddressHexa"` + Amount string `bson:"amount"` + SourceWallet string `bson:"sourceWallet"` + DestinationWallet string `bson:"destinationWallet"` + Fee string `bson:"fee"` + Timestamp time.Time `bson:"timestamp"` + AppIds []string `bson:"appIds"` } // NewTransferReportJob creates a new transfer report job. @@ -54,6 +56,10 @@ func (j *TransferReportJob) Run(ctx context.Context) error { return err } + writer := csv.NewWriter(file) + + _ = j.writeHeader(writer) + defer file.Close() //start backfilling @@ -75,7 +81,7 @@ func (j *TransferReportJob) Run(ctx context.Context) error { j.logger.Debug("Processing transaction", zap.String("id", t.ID)) if t.TokenAddressHexa == "" { - j.writeRecord(t, t.Amount, nil, nil, file) + j.writeRecord(t, t.Amount, nil, nil, writer) continue } @@ -95,7 +101,7 @@ func (j *TransferReportJob) Run(ctx context.Context) error { continue } if t.Amount == "" { - j.writeRecord(t, t.Amount, nil, nil, file) + j.writeRecord(t, t.Amount, nil, nil, writer) continue } amount := new(big.Int) @@ -105,26 +111,26 @@ func (j *TransferReportJob) Run(ctx context.Context) error { zap.String("id", t.ID), zap.String("amount", t.Amount), ) - j.writeRecord(t, "", nil, nil, file) + j.writeRecord(t, "", nil, nil, writer) continue } priceUSD := prices.CalculatePriceUSD(tokenPrice, amount, m.Decimals) - j.writeRecord(t, t.Amount, m, &priceUSD, file) + j.writeRecord(t, t.Amount, m, &priceUSD, writer) } else { - j.writeRecord(t, t.Amount, nil, nil, file) + j.writeRecord(t, t.Amount, nil, nil, writer) } } + writer.Flush() page++ } return nil } -func (*TransferReportJob) writeRecord(trx transactionResult, fAmount string, m *domain.TokenMetadata, priceUSD *decimal.Decimal, file *os.File) error { - // vaaId, sourceChain,emitterAddress,sequence,sourceWallet, destinationChain,destinationWallet,tokenChain,tokenAddress,amount,decimals,notionalUSD,fee,coinGeckoId,symbol - var notionalUSD, decimals, symbol, coingeckoID string +func (j *TransferReportJob) writeRecord(trx transactionResult, fAmount string, m *domain.TokenMetadata, priceUSD *decimal.Decimal, file *csv.Writer) error { + var notionalUSD, decimals, symbol, coingeckoID, tokenAddress string if m != nil { decimals = fmt.Sprintf("%d", m.Decimals) symbol = m.Symbol.String() @@ -133,12 +139,49 @@ func (*TransferReportJob) writeRecord(trx transactionResult, fAmount string, m * if priceUSD != nil { notionalUSD = priceUSD.Truncate(10).String() } - line := fmt.Sprintf("%s,%d,%s,%s,%s,%d,%s,%d,%s,%s,%s,%s,%s,%s,%s\n", - trx.ID, trx.SourceChain, trx.EmitterAddress, trx.Sequence, trx.SourceWallet, trx.DestinationChain, - trx.DestinationWallet, trx.TokenChain, trx.TokenAddress, fAmount, decimals, - notionalUSD, trx.Fee, coingeckoID, symbol) - _, err := file.WriteString(line) - return err + + tokenAddress = trx.TokenAddress + if !regexp.MustCompile(`^[A-Za-z0-9]*$`).MatchString(tokenAddress) { + tokenAddress, _ = domain.TranslateEmitterAddress(trx.TokenChain, trx.TokenAddressHexa) + } + + var record []string + record = append(record, trx.ID) + record = append(record, chainIDToCsv(trx.SourceChain)) + record = append(record, trx.EmitterAddress) + record = append(record, trx.Sequence) + record = append(record, trx.SourceWallet) + record = append(record, chainIDToCsv(trx.DestinationChain)) + record = append(record, trx.DestinationWallet) + record = append(record, chainIDToCsv(trx.TokenChain)) + record = append(record, tokenAddress) + record = append(record, fAmount) + record = append(record, decimals) + record = append(record, notionalUSD) + record = append(record, trx.Fee) + record = append(record, coingeckoID) + record = append(record, symbol) + return file.Write(record) +} + +func (*TransferReportJob) writeHeader(writer *csv.Writer) error { + var record []string + record = append(record, "vaaId") + record = append(record, "sourceChain") + record = append(record, "emitterAddress") + record = append(record, "sequence") + record = append(record, "sourceWallet") + record = append(record, "destinationChain") + record = append(record, "destinationWallet") + record = append(record, "tokenChain") + record = append(record, "tokenAddress") + record = append(record, "amount") + record = append(record, "decimals") + record = append(record, "notionalUSD") + record = append(record, "fee") + record = append(record, "coinGeckoId") + record = append(record, "symbol") + return writer.Write(record) } func (j *TransferReportJob) findTransactionsByPage(ctx context.Context, page, pageSize int64) ([]transactionResult, error) { @@ -187,11 +230,11 @@ func (j *TransferReportJob) findTransactionsByPage(ctx context.Context, page, pa // add nested fields pipeline = append(pipeline, bson.D{ - {"$addFields", bson.D{ - {"standardizedProperties", bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.rawStandardizedProperties", 0}}}, - {"globalTransactions", bson.M{"$arrayElemAt": []interface{}{"$globalTransactions", 0}}}, - {"appIds", bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.appIds", 0}}}, - {"parsedPayload", bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.parsedPayload", 0}}}, + {Key: "$addFields", Value: bson.D{ + {Key: "standardizedProperties", Value: bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.rawStandardizedProperties", 0}}}, + {Key: "globalTransactions", Value: bson.M{"$arrayElemAt": []interface{}{"$globalTransactions", 0}}}, + {Key: "appIds", Value: bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.appIds", 0}}}, + {Key: "parsedPayload", Value: bson.M{"$arrayElemAt": []interface{}{"$parsedVaa.parsedPayload", 0}}}, }}, }) @@ -229,3 +272,11 @@ func (j *TransferReportJob) findTransactionsByPage(ctx context.Context, page, pa return documents, nil } + +func chainIDToCsv(chainID sdk.ChainID) string { + + if chainID.String() == sdk.ChainIDUnset.String() { + return "" + } + return fmt.Sprintf("%d", int16(chainID)) +}