242 lines
7.9 KiB
Go
242 lines
7.9 KiB
Go
package p
|
|
|
|
import (
|
|
"context"
|
|
"encoding/binary"
|
|
"encoding/hex"
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"strconv"
|
|
"time"
|
|
|
|
"github.com/certusone/wormhole/node/pkg/vaa"
|
|
"github.com/cosmos/cosmos-sdk/types/bech32"
|
|
|
|
"cloud.google.com/go/bigtable"
|
|
|
|
"github.com/gagliardetto/solana-go"
|
|
)
|
|
|
|
// terra native tokens do not have a bech32 address like cw20s do, handle them manually.
|
|
var tokenAddressExceptions = map[string]string{
|
|
"0100000000000000000000000000000000000000000000000000000075757364": "ust",
|
|
"010000000000000000000000000000000000000000000000000000756c756e61": "uluna",
|
|
}
|
|
|
|
// returns a pair of dates before and after the input time.
|
|
// useful for creating a time rage for querying historical price APIs.
|
|
func rangeFromTime(t time.Time, hours int) (start time.Time, end time.Time) {
|
|
duration := time.Duration(hours) * time.Hour
|
|
return t.Add(-duration), t.Add(duration)
|
|
}
|
|
|
|
func transformHexAddressToNative(chain vaa.ChainID, address string) string {
|
|
switch chain {
|
|
case vaa.ChainIDSolana:
|
|
addr, err := hex.DecodeString(address)
|
|
if err != nil {
|
|
log.Fatalf("failed to decode solana string: %v", err)
|
|
}
|
|
if len(addr) != 32 {
|
|
log.Fatalf("address must be 32 bytes. address: %v", address)
|
|
}
|
|
solPk := solana.PublicKeyFromBytes(addr[:])
|
|
return solPk.String()
|
|
case vaa.ChainIDEthereum,
|
|
vaa.ChainIDBSC,
|
|
vaa.ChainIDPolygon:
|
|
addr := fmt.Sprintf("0x%v", address[(len(address)-40):])
|
|
return addr
|
|
case vaa.ChainIDTerra:
|
|
// handle terra native assets manually
|
|
if val, ok := tokenAddressExceptions[address]; ok {
|
|
return val
|
|
}
|
|
|
|
trimmed := address[(len(address) - 40):]
|
|
data, decodeErr := hex.DecodeString(trimmed)
|
|
if decodeErr != nil {
|
|
fmt.Printf("failed to decode unpadded string: %v\n", decodeErr)
|
|
}
|
|
encodedAddr, convertErr := bech32.ConvertAndEncode("terra", data)
|
|
if convertErr != nil {
|
|
fmt.Println("convert error from cosmos bech32. err", convertErr)
|
|
}
|
|
return encodedAddr
|
|
default:
|
|
panic(fmt.Errorf("cannot process address for unknown chain: %v", chain))
|
|
}
|
|
}
|
|
|
|
// ProcessTransfer is triggered by a PubSub message, once a TokenTransferPayload is written to a row.
|
|
func ProcessTransfer(ctx context.Context, m PubSubMessage) error {
|
|
data := string(m.Data)
|
|
if data == "" {
|
|
return fmt.Errorf("no data to process in message")
|
|
}
|
|
|
|
signedVaa, err := vaa.Unmarshal(m.Data)
|
|
if err != nil {
|
|
log.Println("failed Unmarshaling VAA")
|
|
return err
|
|
}
|
|
|
|
// create the bigtable identifier from the VAA data
|
|
rowKey := makeRowKey(signedVaa.EmitterChain, signedVaa.EmitterAddress, signedVaa.Sequence)
|
|
row, err := tbl.ReadRow(ctx, rowKey)
|
|
if err != nil {
|
|
log.Fatalf("Could not read row with key %s: %v", rowKey, err)
|
|
}
|
|
|
|
// get the payload data for this transfer
|
|
var tokenAddress string
|
|
var tokenChain vaa.ChainID
|
|
var amount string
|
|
for _, item := range row[columnFamilies[2]] {
|
|
switch item.Column {
|
|
case "TokenTransferPayload:OriginAddress":
|
|
tokenAddress = string(item.Value)
|
|
case "TokenTransferPayload:OriginChain":
|
|
chainInt, _ := strconv.ParseUint(string(item.Value), 10, 32)
|
|
chainID := vaa.ChainID(chainInt)
|
|
tokenChain = chainID
|
|
case "TokenTransferPayload:Amount":
|
|
amount = string(item.Value)
|
|
}
|
|
}
|
|
|
|
// lookup the asset meta for this transfer.
|
|
// find an AssetMeta message that matches the OriginChain & TokenAddress of the transfer
|
|
var result bigtable.Row
|
|
chainIDPrefix := fmt.Sprintf("%d", tokenChain) // create a string containing the tokenChain chainID, ie "2"
|
|
queryErr := tbl.ReadRows(ctx, bigtable.PrefixRange(chainIDPrefix), func(row bigtable.Row) bool {
|
|
result = row
|
|
return true
|
|
}, bigtable.RowFilter(
|
|
bigtable.ChainFilters(
|
|
bigtable.FamilyFilter(columnFamilies[3]),
|
|
bigtable.ColumnFilter("TokenAddress"),
|
|
bigtable.ValueFilter(tokenAddress),
|
|
)))
|
|
|
|
if queryErr != nil {
|
|
log.Fatalf("failed to read rows: %v", queryErr)
|
|
}
|
|
if result == nil {
|
|
log.Printf("did not find AssetMeta row for tokenAddress: %v. Transfer rowKey: %v\n", tokenAddress, rowKey)
|
|
return fmt.Errorf("did not find AssetMeta row for tokenAddress %v", tokenAddress)
|
|
}
|
|
// now get the entire row
|
|
assetMetaRow, assetMetaErr := tbl.ReadRow(ctx, result.Key(), bigtable.RowFilter(bigtable.LatestNFilter(1)))
|
|
if assetMetaErr != nil {
|
|
log.Fatalf("Could not read row with key %s: %v", rowKey, assetMetaErr)
|
|
}
|
|
if _, ok := assetMetaRow[columnFamilies[3]]; !ok {
|
|
log.Println("did not find AssetMeta matching TokenAddress", tokenAddress)
|
|
return fmt.Errorf("did not find AssetMeta matching TokenAddress %v", tokenAddress)
|
|
}
|
|
|
|
// get AssetMeta values
|
|
var decimals int
|
|
var symbol string
|
|
var name string
|
|
var coinId string
|
|
var nativeTokenAddress string
|
|
for _, item := range assetMetaRow[columnFamilies[3]] {
|
|
switch item.Column {
|
|
case "AssetMetaPayload:Decimals":
|
|
decimalStr := string(item.Value)
|
|
dec, err := strconv.Atoi(decimalStr)
|
|
if err != nil {
|
|
log.Fatalf("failed parsing decimals of row %v", assetMetaRow.Key())
|
|
}
|
|
decimals = dec
|
|
case "AssetMetaPayload:Symbol":
|
|
symbol = string(item.Value)
|
|
case "AssetMetaPayload:Name":
|
|
name = string(item.Value)
|
|
case "AssetMetaPayload:CoinGeckoCoinId":
|
|
coinId = string(item.Value)
|
|
case "AssetMetaPayload:NativeAddress":
|
|
nativeTokenAddress = string(item.Value)
|
|
}
|
|
}
|
|
if coinId == "" {
|
|
log.Printf("no coinId for symbol: %v, nothing to lookup.\n", symbol)
|
|
// no coinId for this asset, cannot get price from coingecko.
|
|
return nil
|
|
}
|
|
|
|
// transfers created by the bridge UI will have at most 8 decimals.
|
|
if decimals > 8 {
|
|
decimals = 8
|
|
}
|
|
// ensure amount string is long enough
|
|
if len(amount) < decimals {
|
|
amount = fmt.Sprintf("%0*v", decimals, amount)
|
|
}
|
|
|
|
intAmount := amount[:len(amount)-decimals]
|
|
decAmount := amount[len(amount)-decimals:]
|
|
calculatedAmount := intAmount + "." + decAmount
|
|
|
|
timestamp := signedVaa.Timestamp.UTC()
|
|
price, _ := fetchCoinGeckoPrice(coinId, timestamp)
|
|
|
|
if price == 0 {
|
|
// no price found, don't save
|
|
log.Printf("no price for symbol: %v, name: %v, address: %v, at: %v. rowKey: %v\n", symbol, name, nativeTokenAddress, timestamp.String(), rowKey)
|
|
return nil
|
|
}
|
|
|
|
// convert the amount string so it can be used for math
|
|
amountFloat, convErr := strconv.ParseFloat(calculatedAmount, 64)
|
|
if convErr != nil {
|
|
log.Fatalf("failed parsing calculatedAmount '%v' to float64. err %v", calculatedAmount, convErr)
|
|
}
|
|
notional := amountFloat * price
|
|
notionalStr := fmt.Sprintf("%f", notional)
|
|
|
|
log.Printf("processed transfer of $%0.2f = %v %v * $%0.2f\n", notional, calculatedAmount, symbol, price)
|
|
|
|
// write to BigTable
|
|
colFam := columnFamilies[5]
|
|
mutation := bigtable.NewMutation()
|
|
ts := bigtable.Now()
|
|
|
|
mutation.Set(colFam, "Amount", ts, []byte(calculatedAmount))
|
|
mutation.Set(colFam, "Decimals", ts, []byte(fmt.Sprint(decimals)))
|
|
var notionalbuf [8]byte
|
|
binary.BigEndian.PutUint64(notionalbuf[:], math.Float64bits(notional))
|
|
mutation.Set(colFam, "NotionalUSD", ts, notionalbuf[:])
|
|
mutation.Set(colFam, "NotionalUSDStr", ts, []byte(notionalStr))
|
|
var priceBuf [8]byte
|
|
binary.BigEndian.PutUint64(priceBuf[:], math.Float64bits(price))
|
|
mutation.Set(colFam, "TokenPriceUSD", ts, priceBuf[:])
|
|
mutation.Set(colFam, "TokenPriceUSDStr", ts, []byte(fmt.Sprintf("%f", price)))
|
|
mutation.Set(colFam, "TransferTimestamp", ts, []byte(timestamp.String()))
|
|
mutation.Set(colFam, "OriginSymbol", ts, []byte(symbol))
|
|
mutation.Set(colFam, "OriginName", ts, []byte(name))
|
|
mutation.Set(colFam, "OriginTokenAddress", ts, []byte(nativeTokenAddress))
|
|
|
|
// TODO - find the symbol & name of the asset on the target chain?
|
|
// mutation.Set(colFam, "TargetSymbol", ts, []byte())
|
|
// mutation.Set(colFam, "TargetName", ts, []byte())
|
|
|
|
// conditional mutation - don't write if row already has an Amount value.
|
|
filter := bigtable.ChainFilters(
|
|
bigtable.FamilyFilter(colFam),
|
|
bigtable.ColumnFilter("Amount"))
|
|
conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation)
|
|
|
|
writeErr := tbl.Apply(ctx, rowKey, conditionalMutation)
|
|
if writeErr != nil {
|
|
log.Printf("Failed to write TokenTransferDetails for %v to BigTable. err: %v\n", rowKey, writeErr)
|
|
return writeErr
|
|
}
|
|
|
|
// success
|
|
return nil
|
|
}
|