diff --git a/event_database/cloud_functions/external-data.go b/event_database/cloud_functions/external-data.go index 545b4aa6d..2c291dd53 100644 --- a/event_database/cloud_functions/external-data.go +++ b/event_database/cloud_functions/external-data.go @@ -343,13 +343,82 @@ func fetchTokenPrices(ctx context.Context, coinIds []string) map[string]float64 allPrices[coinId] = price } - // CoinGecko rate limit is low (5/second), be very cautious about bursty requests - time.Sleep(time.Millisecond * 200) + coinGeckoRateLimitSleep() } return allPrices } +func coinGeckoRateLimitSleep() { + // CoinGecko rate limit is low (5/second), be very cautious about bursty requests + time.Sleep(time.Millisecond * 200) +} + +// fetchTokenPriceHistories returns the daily prices for coinIds from start to end +func fetchTokenPriceHistories(ctx context.Context, coinIds []string, start time.Time, end time.Time) map[string]map[string]float64 { + log.Printf("Fetching price history for %d tokens\n", len(coinIds)) + priceHistories := map[string]map[string]float64{} + baseUrl := cgBaseUrl + cgApiKey := os.Getenv("COINGECKO_API_KEY") + if cgApiKey != "" { + baseUrl = cgProBaseUrl + } + startTimestamp := start.Unix() + endTimestamp := end.Unix() + for _, coinId := range coinIds { + defer coinGeckoRateLimitSleep() + url := fmt.Sprintf("%vcoins/%v/market_chart/range?vs_currency=usd&from=%v&to=%v", baseUrl, coinId, startTimestamp, endTimestamp) + req, reqErr := http.NewRequest("GET", url, nil) + if reqErr != nil { + log.Fatalf("failed coins request, err: %v\n", reqErr) + } + if cgApiKey != "" { + req.Header.Set("X-Cg-Pro-Api-Key", cgApiKey) + } + + res, resErr := http.DefaultClient.Do(req) + if resErr != nil { + log.Fatalf("failed get coins response, err: %v\n", resErr) + } + defer res.Body.Close() + if res.StatusCode >= 400 { + errorMsg := fmt.Sprintf("failed to get CoinGecko price history for %s, Status: %s", coinId, res.Status) + if res.StatusCode == 404 { + log.Println(errorMsg) + continue + } else { + log.Fatalln(errorMsg) + } + } + + body, bodyErr := ioutil.ReadAll(res.Body) + if bodyErr != nil { + log.Fatalf("failed decoding coins body, err: %v\n", bodyErr) + } + + var parsed CoinGeckoMarketRes + parseErr := json.Unmarshal(body, &parsed) + if parseErr != nil { + log.Printf("fetchTokenPriceHistories failed parsing body. err %v\n", parseErr) + var errRes CoinGeckoErrorRes + if err := json.Unmarshal(body, &errRes); err == nil { + log.Println("Failed calling CoinGecko, got err", errRes.Error) + } + } else { + for _, market := range parsed.Prices { + seconds := int64(market[0]) / 1e3 + date := time.Unix(seconds, 0).Format("2006-01-02") + price := market[1] + if _, ok := priceHistories[date]; !ok { + priceHistories[date] = map[string]float64{} + } + priceHistories[date][coinId] = price + } + } + } + return priceHistories +} + const solanaTokenListURL = "https://raw.githubusercontent.com/solana-labs/token-list/main/src/tokens/solana.tokenlist.json" type SolanaToken struct { diff --git a/event_database/cloud_functions/notional-transferred-to.go b/event_database/cloud_functions/notional-transferred-to.go index d68725a6b..61ee2c489 100644 --- a/event_database/cloud_functions/notional-transferred-to.go +++ b/event_database/cloud_functions/notional-transferred-to.go @@ -31,17 +31,18 @@ var muWarmTransfersToCache sync.RWMutex var warmTransfersToCacheFilePath = "notional-transferred-to-cache.json" type TransferData struct { - TokenSymbol string - TokenName string - TokenAddress string - TokenAmount float64 - CoinGeckoCoinId string - OriginChain string - LeavingChain string - DestinationChain string - Notional float64 - TokenPrice float64 - TokenDecimals int + TokenSymbol string + TokenName string + TokenAddress string + TokenAmount float64 + CoinGeckoCoinId string + OriginChain string + LeavingChain string + DestinationChain string + Notional float64 + TokenPrice float64 + TokenDecimals int + TransferTimestamp string } // finds all the TokenTransfer rows within the specified period @@ -83,6 +84,8 @@ func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefi t.CoinGeckoCoinId = string(item.Value) case "TokenTransferDetails:Decimals": t.TokenDecimals, _ = strconv.Atoi(string(item.Value)) + case "TokenTransferDetails:TransferTimestamp": + t.TransferTimestamp = string(item.Value) } } @@ -100,7 +103,8 @@ func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefi keyParts := strings.Split(row.Key(), ":") t.LeavingChain = keyParts[0] - if isTokenAllowed(t.OriginChain, t.TokenAddress) { + transferDateStr := t.TransferTimestamp[0:10] + if isTokenAllowed(t.OriginChain, t.TokenAddress) && isTokenActive(t.OriginChain, t.TokenAddress, transferDateStr) { rows = append(rows, *t) } } @@ -116,8 +120,8 @@ func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefi bigtable.StripValueFilter(), // no columns/values, just the row.Key() ), bigtable.ChainFilters( - bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])), - bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginName|OriginChain|TargetChain|CoinGeckoCoinId|OriginTokenAddress|TokenPriceUSD|Decimals"), + bigtable.FamilyFilter(fmt.Sprintf("%v|%v", transferPayloadFam, transferDetailsFam)), + bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginName|OriginChain|TargetChain|CoinGeckoCoinId|OriginTokenAddress|TokenPriceUSD|Decimals|TransferTimestamp"), bigtable.LatestNFilter(1), ), bigtable.BlockAllFilter(), diff --git a/event_database/cloud_functions/notional-tvl-cumulative.go b/event_database/cloud_functions/notional-tvl-cumulative.go index 8e7a7ffa9..9cb0f26c3 100644 --- a/event_database/cloud_functions/notional-tvl-cumulative.go +++ b/event_database/cloud_functions/notional-tvl-cumulative.go @@ -27,12 +27,58 @@ var warmTvlCumulativeCacheFilePath = "tvl-cumulative-cache.json" var notionalTvlCumulativeResultPath = "notional-tvl-cumulative.json" +var coinGeckoPriceCacheFilePath = "coingecko-price-cache.json" +var coinGeckoPriceCache = map[string]map[string]float64{} +var loadedCoinGeckoPriceCache bool + // days to be excluded from the TVL result var skipDays = map[string]bool{ // for example: // "2022-02-19": true, } +func loadAndUpdateCoinGeckoPriceCache(ctx context.Context, coinIds []string, now time.Time) { + // at cold-start, load the price cache into memory, and fetch any missing token price histories and add them to the cache + if !loadedCoinGeckoPriceCache { + // load the price cache + loadJsonToInterface(ctx, coinGeckoPriceCacheFilePath, &muWarmTvlCumulativeCache, &coinGeckoPriceCache) + loadedCoinGeckoPriceCache = true + + // find tokens missing price history + missing := []string{} + for _, coinId := range coinIds { + found := false + for _, prices := range coinGeckoPriceCache { + if _, ok := prices[coinId]; ok { + found = true + break + } + } + if !found { + missing = append(missing, coinId) + } + } + + // fetch missing price histories and add them to the cache + priceHistories := fetchTokenPriceHistories(ctx, missing, releaseDay, now) + for date, prices := range priceHistories { + for coinId, price := range prices { + if _, ok := coinGeckoPriceCache[date]; !ok { + coinGeckoPriceCache[date] = map[string]float64{} + } + coinGeckoPriceCache[date][coinId] = price + } + } + } + + // fetch today's latest prices + today := now.Format("2006-01-02") + coinGeckoPriceCache[today] = fetchTokenPrices(ctx, coinIds) + + // write to the cache file + persistInterfaceToJson(ctx, coinGeckoPriceCacheFilePath, &muWarmCumulativeAddressesCache, coinGeckoPriceCache) +} + // calculates a running total of notional value transferred, by symbol, since the start time specified. func createTvlCumulativeOfInterval(tbl *bigtable.Table, ctx context.Context, start time.Time) map[string]map[string]map[string]LockedAsset { if len(warmTvlCumulativeCache) == 0 { @@ -238,6 +284,22 @@ func ComputeTvlCumulative(w http.ResponseWriter, r *http.Request) { transfers := createTvlCumulativeOfInterval(tbl, ctx, start) + coinIdSet := map[string]bool{} + for _, chains := range transfers { + for _, assets := range chains { + for _, asset := range assets { + if asset.CoinGeckoId != "*" { + coinIdSet[asset.CoinGeckoId] = true + } + } + } + } + coinIds := []string{} + for coinId := range coinIdSet { + coinIds = append(coinIds, coinId) + } + loadAndUpdateCoinGeckoPriceCache(ctx, coinIds, now) + // calculate the notional tvl based on the price of the tokens each day for date, chains := range transfers { if _, ok := skipDays[date]; ok { @@ -265,7 +327,16 @@ func ComputeTvlCumulative(w http.ResponseWriter, r *http.Request) { continue } - notional := asset.Amount * asset.TokenPrice + // asset.TokenPrice is the price that was fetched when this token was last transferred, possibly before this date + // prefer to use the cached price for this date if it's available, because it might be newer + tokenPrice := asset.TokenPrice + if prices, ok := coinGeckoPriceCache[date]; ok { + if price, ok := prices[asset.CoinGeckoId]; ok { + // use the cached price + tokenPrice = price + } + } + notional := asset.Amount * tokenPrice if notional <= 0 { continue } diff --git a/event_database/cloud_functions/notional-tvl.go b/event_database/cloud_functions/notional-tvl.go index 2a074de31..502cbb4cb 100644 --- a/event_database/cloud_functions/notional-tvl.go +++ b/event_database/cloud_functions/notional-tvl.go @@ -306,6 +306,9 @@ func ComputeTVL(w http.ResponseWriter, r *http.Request) { ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) defer cancel() + now := time.Now().UTC() + todaysDateStr := now.Format("2006-01-02") + getNotionalAmounts := func(ctx context.Context, tokensLocked map[string]map[string]LockedAsset) map[string]map[string]LockedAsset { // create a map of all the coinIds seenCoinIds := map[string]bool{} @@ -340,6 +343,9 @@ func ComputeTVL(w http.ResponseWriter, r *http.Request) { Address: "*", } for address, lockedAsset := range tokens { + if !isTokenActive(chain, address, todaysDateStr) { + continue + } coinId := lockedAsset.CoinGeckoId amount := lockedAsset.Amount @@ -391,7 +397,6 @@ func ComputeTVL(w http.ResponseWriter, r *http.Request) { wg.Add(1) go func() { last24HourInterval := -time.Duration(24) * time.Hour - now := time.Now().UTC() start := now.Add(last24HourInterval) defer wg.Done() transfers := tvlForInterval(tbl, ctx, start, now) diff --git a/event_database/cloud_functions/shared.go b/event_database/cloud_functions/shared.go index 5081fa434..abb791cea 100644 --- a/event_database/cloud_functions/shared.go +++ b/event_database/cloud_functions/shared.go @@ -9,6 +9,7 @@ import ( "log" "math" "os" + "strconv" "strings" "sync" "time" @@ -264,6 +265,10 @@ func chainIdStringToType(chainId string) vaa.ChainID { return vaa.ChainIDUnset } +func chainIDToNumberString(c vaa.ChainID) string { + return strconv.FormatUint(uint64(c), 10) +} + func makeSummary(row bigtable.Row) *Summary { summary := &Summary{} if _, ok := row[messagePubFam]; ok { @@ -460,3 +465,19 @@ func isTokenAllowed(chainId string, tokenAddress string) bool { } return false } + +// tokens with no trading activity recorded by exchanges integrated on CoinGecko since the specified date +var inactiveTokens = map[string]map[string]string{ + chainIDToNumberString(vaa.ChainIDEthereum): { + "0x707f9118e33a9b8998bea41dd0d46f38bb963fc8": "2022-06-15", // Anchor bETH token + }, +} + +func isTokenActive(chainId string, tokenAddress string, date string) bool { + if deactivatedDates, ok := inactiveTokens[chainId]; ok { + if deactivatedDate, ok := deactivatedDates[tokenAddress]; ok { + return date < deactivatedDate + } + } + return true +}