diff --git a/node/pkg/governor/governor.go b/node/pkg/governor/governor.go index 056bac013..cc799f5ab 100644 --- a/node/pkg/governor/governor.go +++ b/node/pkg/governor/governor.go @@ -126,7 +126,7 @@ type ChainGovernor struct { msgsSeen map[string]bool // Key is hash, payload is consts transferComplete and transferEnqueued. msgsToPublish []*common.MessagePublication dayLengthInMinutes int - coinGeckoQuery string + coinGeckoQueries []string env int nextStatusPublishTime time.Time nextConfigPublishTime time.Time diff --git a/node/pkg/governor/governor_prices.go b/node/pkg/governor/governor_prices.go index 120d79b6a..d6d7c9585 100644 --- a/node/pkg/governor/governor_prices.go +++ b/node/pkg/governor/governor_prices.go @@ -26,33 +26,31 @@ import ( // The CoinGecko API is documented here: https://www.coingecko.com/en/api/documentation // An example of the query to be generated: https://api.coingecko.com/api/v3/simple/price?ids=gemma-extending-tech,bitcoin,weth&vs_currencies=usd +// coinGeckoQueryIntervalInMins specifies how often we query CoinGecko for prices. const coinGeckoQueryIntervalInMins = 15 -func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error { - ids := "" - first := true - for coinGeckoId := range gov.tokensByCoinGeckoId { - if first { - first = false - } else { - ids += "," - } +// tokensPerCoinGeckoQuery specifies how many tokens will be in each CoinGecko query. The token list will be broken up into chunks of this size. +const tokensPerCoinGeckoQuery = 200 - ids += coinGeckoId +// initCoinGecko builds the set of CoinGecko queries that will be used to update prices. It also starts a go routine to periodically do the queries. +func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error { + // Create a slice of all the CoinGecko IDs so we can create the corresponding queries. + ids := make([]string, 0, len(gov.tokensByCoinGeckoId)) + for id := range gov.tokensByCoinGeckoId { + ids = append(ids, id) } - params := url.Values{} - params.Add("ids", ids) - params.Add("vs_currencies", "usd") + // Create the set of queries, breaking the IDs into the appropriate size chunks. + gov.coinGeckoQueries = createCoinGeckoQueries(ids, tokensPerCoinGeckoQuery) + for queryIdx, query := range gov.coinGeckoQueries { + gov.logger.Info("cgov: coingecko query: ", zap.Int("queryIdx", queryIdx), zap.String("query", query)) + } - if first { + if len(gov.coinGeckoQueries) == 0 { gov.logger.Info("cgov: did not find any tokens, nothing to do!") return nil } - gov.coinGeckoQuery = "https://api.coingecko.com/api/v3/simple/price?" + params.Encode() - gov.logger.Info("cgov: coingecko query: ", zap.String("query", gov.coinGeckoQuery)) - if run { if err := supervisor.Run(ctx, "govpricer", gov.PriceQuery); err != nil { return err @@ -62,6 +60,48 @@ func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error { return nil } +// createCoinGeckoQueries creates the set of CoinGecko queries, breaking the set of IDs into the appropriate size chunks. +func createCoinGeckoQueries(idList []string, tokensPerQuery int) []string { + var queries []string + queryIdx := 0 + tokenIdx := 0 + ids := "" + first := true + for _, coinGeckoId := range idList { + if tokenIdx%tokensPerQuery == 0 && tokenIdx != 0 { + queries = append(queries, createCoinGeckoQuery(ids)) + ids = "" + first = true + queryIdx += 1 + } + if first { + first = false + } else { + ids += "," + } + + ids += coinGeckoId + tokenIdx += 1 + } + + if ids != "" { + queries = append(queries, createCoinGeckoQuery(ids)) + } + + return queries +} + +// createCoinGeckoQuery creates a CoinGecko query for the specified set of IDs. +func createCoinGeckoQuery(ids string) string { + params := url.Values{} + params.Add("ids", ids) + params.Add("vs_currencies", "usd") + + query := "https://api.coingecko.com/api/v3/simple/price?" + params.Encode() + return query +} + +// PriceQuery is the entry point for the routine that periodically queries CoinGecko for prices. func (gov *ChainGovernor) PriceQuery(ctx context.Context) error { // Do a query immediately, then once each interval. // We ignore the error because an error would already have been logged, and we don't want to bring down the @@ -81,50 +121,25 @@ func (gov *ChainGovernor) PriceQuery(ctx context.Context) error { } } -// queryCoinGecko sends a query to the CoinGecko server to get the latest prices. It can +// queryCoinGecko sends a series of of one or more queries to the CoinGecko server to get the latest prices. It can // return an error, but that is only used by the tool that validates the query. In the actual governor, // it just logs the error and we will try again next interval. If an error happens, any tokens that have // not been updated will be assigned their pre-configured price. func (gov *ChainGovernor) queryCoinGecko() error { - response, err := http.Get(gov.coinGeckoQuery) - if err != nil { - gov.logger.Error("cgov: failed to query coin gecko, reverting to configured prices", zap.String("query", gov.coinGeckoQuery), zap.Error(err)) - gov.revertAllPrices() - return fmt.Errorf("failed to query CoinGecko") - } - - defer func() { - err = response.Body.Close() + result := make(map[string]interface{}) + for queryIdx, query := range gov.coinGeckoQueries { + thisResult, err := gov.queryCoinGeckoChunk(query) if err != nil { - gov.logger.Error("cgov: failed to close coin gecko query") - // We can't safely call revertAllPrices() here because we don't know if we hold the lock or not. - // Also, we don't need to because the prices have already been updated / reverted by this point. + gov.logger.Error("cgov: CoinGecko query failed", zap.Int("queryIdx", queryIdx), zap.String("query", query), zap.Error(err)) + gov.revertAllPrices() + return err } - }() - responseData, err := io.ReadAll(response.Body) - if err != nil { - gov.logger.Error("cgov: failed to parse coin gecko response, reverting to configured prices", zap.Error(err)) - gov.revertAllPrices() - return fmt.Errorf("failed to parse CoinGecko response") - } + for key, value := range thisResult { + result[key] = value + } - resp := string(responseData) - if strings.Contains(resp, "error_code") { - gov.logger.Error("cgov: coin gecko query failed, reverting to configured prices", - zap.String("response", resp), - zap.String("query", gov.coinGeckoQuery), - ) - - gov.revertAllPrices() - return fmt.Errorf("coin gecko query failed: %s", resp) - } - - var result map[string]interface{} - if err := json.Unmarshal(responseData, &result); err != nil { - gov.logger.Error("cgov: failed to unmarshal coin gecko json, reverting to configured prices", zap.Error(err)) - gov.revertAllPrices() - return fmt.Errorf("failed to unmarshal json") + time.Sleep(1 * time.Second) } now := time.Now() @@ -146,7 +161,7 @@ func (gov *ChainGovernor) queryCoinGecko() error { var ok bool price, ok = m["usd"].(float64) if !ok { - gov.logger.Error("cgov: failed to parse coin gecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId)) + gov.logger.Error("cgov: failed to parse CoinGecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId)) // By continuing, we leave this one in the local map so the price will get reverted below. continue } @@ -178,11 +193,48 @@ func (gov *ChainGovernor) queryCoinGecko() error { // Don't update the timestamp so we'll know when we last received an update from CoinGecko. } } + + return fmt.Errorf("cgov: failed to update prices for some tokens") } return nil } +// queryCoinGeckoChunk sends a single CoinGecko query and returns the result. +func (gov *ChainGovernor) queryCoinGeckoChunk(query string) (map[string]interface{}, error) { + var result map[string]interface{} + + gov.logger.Debug("cgov: executing CoinGecko query", zap.String("query", query)) + response, err := http.Get(query) //nolint:gosec + if err != nil { + return result, fmt.Errorf("failed to query CoinGecko: %w", err) + } + + defer func() { + err = response.Body.Close() + if err != nil { + gov.logger.Error("cgov: failed to close CoinGecko query: %w", zap.Error(err)) + } + }() + + responseData, err := io.ReadAll(response.Body) + if err != nil { + return result, fmt.Errorf("failed to read CoinGecko response: %w", err) + } + + resp := string(responseData) + if strings.Contains(resp, "error_code") { + return result, fmt.Errorf("CoinGecko query failed: %s", resp) + } + + if err := json.Unmarshal(responseData, &result); err != nil { + return result, fmt.Errorf("failed to unmarshal CoinGecko json: %w", err) + } + + return result, nil +} + +// revertAllPrices reverts the price of all tokens to the configured prices. It is used when a CoinGecko query fails. func (gov *ChainGovernor) revertAllPrices() { gov.mutex.Lock() defer gov.mutex.Unlock() @@ -202,7 +254,7 @@ func (gov *ChainGovernor) revertAllPrices() { } } -// We should use the max(coinGeckoPrice, configuredPrice) as our price for computing notional value. +// updatePrice updates the price of a single token. We should use the max(coinGeckoPrice, configuredPrice) as our price for computing notional value. func (te tokenEntry) updatePrice() { if (te.coinGeckoPrice == nil) || (te.coinGeckoPrice.Cmp(te.cfgPrice) < 0) { te.price.Set(te.cfgPrice) @@ -211,6 +263,7 @@ func (te tokenEntry) updatePrice() { } } +// CheckQuery is a free function used to test that the CoinGecko query still works after the mainnet token list has been updated. func CheckQuery(logger *zap.Logger) error { logger.Info("Instantiating governor.") ctx := context.Background() @@ -221,16 +274,16 @@ func CheckQuery(logger *zap.Logger) error { return err } - logger.Info("Building Coin Gecko query.") + logger.Info("Building CoinGecko query.") if err := gov.initCoinGecko(ctx, false); err != nil { return err } - logger.Info("Initiating Coin Gecko query.") + logger.Info("Initiating CoinGecko query.") if err := gov.queryCoinGecko(); err != nil { return err } - logger.Info("Coin Gecko query complete.") + logger.Info("CoinGecko query complete.") return nil } diff --git a/node/pkg/governor/governor_test.go b/node/pkg/governor/governor_test.go index d5ab50060..7ce27b85f 100644 --- a/node/pkg/governor/governor_test.go +++ b/node/pkg/governor/governor_test.go @@ -6,6 +6,8 @@ import ( "fmt" "math" "math/big" + "net/url" + "strings" "testing" "time" @@ -1719,3 +1721,73 @@ func TestReusedMsgIdWithDifferentPayloadGetsProcessed(t *testing.T) { assert.Equal(t, uint64(0), valuePending) assert.Equal(t, 2, len(gov.msgsSeen)) } + +func getIdsFromCoinGeckoQuery(t *testing.T, query string) []string { + unescaped, err := url.QueryUnescape(query) + require.NoError(t, err) + + fields := strings.Split(unescaped, "?") + require.Equal(t, 2, len(fields)) + + u, err := url.ParseQuery(fields[1]) + require.NoError(t, err) + + idField, exists := u["ids"] + require.Equal(t, true, exists) + require.Equal(t, 1, len(idField)) + + return strings.Split(idField[0], ",") +} + +func TestCoinGeckoQueries(t *testing.T) { + type testCase struct { + desc string + numIds int + chunkSize int + expectedQueries int + } + + tests := []testCase{ + {numIds: 0, chunkSize: 100, expectedQueries: 0, desc: "Zero queries"}, + {numIds: 42, chunkSize: 100, expectedQueries: 1, desc: "Easily fits in one"}, + {numIds: 100, chunkSize: 100, expectedQueries: 1, desc: "Exactly fits in one"}, + {numIds: 242, chunkSize: 207, expectedQueries: 2, desc: "Easily fits in two"}, + {numIds: 414, chunkSize: 207, expectedQueries: 2, desc: "Exactly fits in two"}, + {numIds: 5001, chunkSize: 207, expectedQueries: 25, desc: "A bunch of queries"}, + } + + for _, tc := range tests { + t.Run(tc.desc, func(t *testing.T) { + ids := make([]string, tc.numIds) + for idx := 0; idx < tc.numIds; idx++ { + ids[idx] = fmt.Sprintf("id%d", idx) + } + + queries := createCoinGeckoQueries(ids, tc.chunkSize) + require.Equal(t, tc.expectedQueries, len(queries)) + + results := make(map[string]string) + for _, query := range queries { + idsInQuery := getIdsFromCoinGeckoQuery(t, query) + require.GreaterOrEqual(t, tc.chunkSize, len(idsInQuery)) + for _, id := range idsInQuery { + results[id] = id + } + } + + require.Equal(t, tc.numIds, len(results)) + + for _, id := range ids { + if _, exists := results[id]; !exists { + assert.Equal(t, "id not found in query", id) + } + delete(results, id) + } + if len(results) != 0 { + for id := range results { + assert.Equal(t, "bogus id created by query", id) + } + } + }) + } +}