Node/Gov: Split up CoinGecko queries (#2573)
* Node/Gov: Split up CoinGecko queries * Fix typos * Minor logging change * Add tests
This commit is contained in:
parent
56b847c41d
commit
4f1feb3899
|
@ -126,7 +126,7 @@ type ChainGovernor struct {
|
|||
msgsSeen map[string]bool // Key is hash, payload is consts transferComplete and transferEnqueued.
|
||||
msgsToPublish []*common.MessagePublication
|
||||
dayLengthInMinutes int
|
||||
coinGeckoQuery string
|
||||
coinGeckoQueries []string
|
||||
env int
|
||||
nextStatusPublishTime time.Time
|
||||
nextConfigPublishTime time.Time
|
||||
|
|
|
@ -26,33 +26,31 @@ import (
|
|||
// The CoinGecko API is documented here: https://www.coingecko.com/en/api/documentation
|
||||
// An example of the query to be generated: https://api.coingecko.com/api/v3/simple/price?ids=gemma-extending-tech,bitcoin,weth&vs_currencies=usd
|
||||
|
||||
// coinGeckoQueryIntervalInMins specifies how often we query CoinGecko for prices.
|
||||
const coinGeckoQueryIntervalInMins = 15
|
||||
|
||||
func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error {
|
||||
ids := ""
|
||||
first := true
|
||||
for coinGeckoId := range gov.tokensByCoinGeckoId {
|
||||
if first {
|
||||
first = false
|
||||
} else {
|
||||
ids += ","
|
||||
}
|
||||
// tokensPerCoinGeckoQuery specifies how many tokens will be in each CoinGecko query. The token list will be broken up into chunks of this size.
|
||||
const tokensPerCoinGeckoQuery = 200
|
||||
|
||||
ids += coinGeckoId
|
||||
// initCoinGecko builds the set of CoinGecko queries that will be used to update prices. It also starts a go routine to periodically do the queries.
|
||||
func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error {
|
||||
// Create a slice of all the CoinGecko IDs so we can create the corresponding queries.
|
||||
ids := make([]string, 0, len(gov.tokensByCoinGeckoId))
|
||||
for id := range gov.tokensByCoinGeckoId {
|
||||
ids = append(ids, id)
|
||||
}
|
||||
|
||||
params := url.Values{}
|
||||
params.Add("ids", ids)
|
||||
params.Add("vs_currencies", "usd")
|
||||
// Create the set of queries, breaking the IDs into the appropriate size chunks.
|
||||
gov.coinGeckoQueries = createCoinGeckoQueries(ids, tokensPerCoinGeckoQuery)
|
||||
for queryIdx, query := range gov.coinGeckoQueries {
|
||||
gov.logger.Info("cgov: coingecko query: ", zap.Int("queryIdx", queryIdx), zap.String("query", query))
|
||||
}
|
||||
|
||||
if first {
|
||||
if len(gov.coinGeckoQueries) == 0 {
|
||||
gov.logger.Info("cgov: did not find any tokens, nothing to do!")
|
||||
return nil
|
||||
}
|
||||
|
||||
gov.coinGeckoQuery = "https://api.coingecko.com/api/v3/simple/price?" + params.Encode()
|
||||
gov.logger.Info("cgov: coingecko query: ", zap.String("query", gov.coinGeckoQuery))
|
||||
|
||||
if run {
|
||||
if err := supervisor.Run(ctx, "govpricer", gov.PriceQuery); err != nil {
|
||||
return err
|
||||
|
@ -62,6 +60,48 @@ func (gov *ChainGovernor) initCoinGecko(ctx context.Context, run bool) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// createCoinGeckoQueries creates the set of CoinGecko queries, breaking the set of IDs into the appropriate size chunks.
|
||||
func createCoinGeckoQueries(idList []string, tokensPerQuery int) []string {
|
||||
var queries []string
|
||||
queryIdx := 0
|
||||
tokenIdx := 0
|
||||
ids := ""
|
||||
first := true
|
||||
for _, coinGeckoId := range idList {
|
||||
if tokenIdx%tokensPerQuery == 0 && tokenIdx != 0 {
|
||||
queries = append(queries, createCoinGeckoQuery(ids))
|
||||
ids = ""
|
||||
first = true
|
||||
queryIdx += 1
|
||||
}
|
||||
if first {
|
||||
first = false
|
||||
} else {
|
||||
ids += ","
|
||||
}
|
||||
|
||||
ids += coinGeckoId
|
||||
tokenIdx += 1
|
||||
}
|
||||
|
||||
if ids != "" {
|
||||
queries = append(queries, createCoinGeckoQuery(ids))
|
||||
}
|
||||
|
||||
return queries
|
||||
}
|
||||
|
||||
// createCoinGeckoQuery creates a CoinGecko query for the specified set of IDs.
|
||||
func createCoinGeckoQuery(ids string) string {
|
||||
params := url.Values{}
|
||||
params.Add("ids", ids)
|
||||
params.Add("vs_currencies", "usd")
|
||||
|
||||
query := "https://api.coingecko.com/api/v3/simple/price?" + params.Encode()
|
||||
return query
|
||||
}
|
||||
|
||||
// PriceQuery is the entry point for the routine that periodically queries CoinGecko for prices.
|
||||
func (gov *ChainGovernor) PriceQuery(ctx context.Context) error {
|
||||
// Do a query immediately, then once each interval.
|
||||
// We ignore the error because an error would already have been logged, and we don't want to bring down the
|
||||
|
@ -81,50 +121,25 @@ func (gov *ChainGovernor) PriceQuery(ctx context.Context) error {
|
|||
}
|
||||
}
|
||||
|
||||
// queryCoinGecko sends a query to the CoinGecko server to get the latest prices. It can
|
||||
// queryCoinGecko sends a series of of one or more queries to the CoinGecko server to get the latest prices. It can
|
||||
// return an error, but that is only used by the tool that validates the query. In the actual governor,
|
||||
// it just logs the error and we will try again next interval. If an error happens, any tokens that have
|
||||
// not been updated will be assigned their pre-configured price.
|
||||
func (gov *ChainGovernor) queryCoinGecko() error {
|
||||
response, err := http.Get(gov.coinGeckoQuery)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to query coin gecko, reverting to configured prices", zap.String("query", gov.coinGeckoQuery), zap.Error(err))
|
||||
gov.revertAllPrices()
|
||||
return fmt.Errorf("failed to query CoinGecko")
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = response.Body.Close()
|
||||
result := make(map[string]interface{})
|
||||
for queryIdx, query := range gov.coinGeckoQueries {
|
||||
thisResult, err := gov.queryCoinGeckoChunk(query)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to close coin gecko query")
|
||||
// We can't safely call revertAllPrices() here because we don't know if we hold the lock or not.
|
||||
// Also, we don't need to because the prices have already been updated / reverted by this point.
|
||||
gov.logger.Error("cgov: CoinGecko query failed", zap.Int("queryIdx", queryIdx), zap.String("query", query), zap.Error(err))
|
||||
gov.revertAllPrices()
|
||||
return err
|
||||
}
|
||||
}()
|
||||
|
||||
responseData, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to parse coin gecko response, reverting to configured prices", zap.Error(err))
|
||||
gov.revertAllPrices()
|
||||
return fmt.Errorf("failed to parse CoinGecko response")
|
||||
}
|
||||
for key, value := range thisResult {
|
||||
result[key] = value
|
||||
}
|
||||
|
||||
resp := string(responseData)
|
||||
if strings.Contains(resp, "error_code") {
|
||||
gov.logger.Error("cgov: coin gecko query failed, reverting to configured prices",
|
||||
zap.String("response", resp),
|
||||
zap.String("query", gov.coinGeckoQuery),
|
||||
)
|
||||
|
||||
gov.revertAllPrices()
|
||||
return fmt.Errorf("coin gecko query failed: %s", resp)
|
||||
}
|
||||
|
||||
var result map[string]interface{}
|
||||
if err := json.Unmarshal(responseData, &result); err != nil {
|
||||
gov.logger.Error("cgov: failed to unmarshal coin gecko json, reverting to configured prices", zap.Error(err))
|
||||
gov.revertAllPrices()
|
||||
return fmt.Errorf("failed to unmarshal json")
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
now := time.Now()
|
||||
|
@ -146,7 +161,7 @@ func (gov *ChainGovernor) queryCoinGecko() error {
|
|||
var ok bool
|
||||
price, ok = m["usd"].(float64)
|
||||
if !ok {
|
||||
gov.logger.Error("cgov: failed to parse coin gecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId))
|
||||
gov.logger.Error("cgov: failed to parse CoinGecko response, reverting to configured price for this token", zap.String("coinGeckoId", coinGeckoId))
|
||||
// By continuing, we leave this one in the local map so the price will get reverted below.
|
||||
continue
|
||||
}
|
||||
|
@ -178,11 +193,48 @@ func (gov *ChainGovernor) queryCoinGecko() error {
|
|||
// Don't update the timestamp so we'll know when we last received an update from CoinGecko.
|
||||
}
|
||||
}
|
||||
|
||||
return fmt.Errorf("cgov: failed to update prices for some tokens")
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// queryCoinGeckoChunk sends a single CoinGecko query and returns the result.
|
||||
func (gov *ChainGovernor) queryCoinGeckoChunk(query string) (map[string]interface{}, error) {
|
||||
var result map[string]interface{}
|
||||
|
||||
gov.logger.Debug("cgov: executing CoinGecko query", zap.String("query", query))
|
||||
response, err := http.Get(query) //nolint:gosec
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("failed to query CoinGecko: %w", err)
|
||||
}
|
||||
|
||||
defer func() {
|
||||
err = response.Body.Close()
|
||||
if err != nil {
|
||||
gov.logger.Error("cgov: failed to close CoinGecko query: %w", zap.Error(err))
|
||||
}
|
||||
}()
|
||||
|
||||
responseData, err := io.ReadAll(response.Body)
|
||||
if err != nil {
|
||||
return result, fmt.Errorf("failed to read CoinGecko response: %w", err)
|
||||
}
|
||||
|
||||
resp := string(responseData)
|
||||
if strings.Contains(resp, "error_code") {
|
||||
return result, fmt.Errorf("CoinGecko query failed: %s", resp)
|
||||
}
|
||||
|
||||
if err := json.Unmarshal(responseData, &result); err != nil {
|
||||
return result, fmt.Errorf("failed to unmarshal CoinGecko json: %w", err)
|
||||
}
|
||||
|
||||
return result, nil
|
||||
}
|
||||
|
||||
// revertAllPrices reverts the price of all tokens to the configured prices. It is used when a CoinGecko query fails.
|
||||
func (gov *ChainGovernor) revertAllPrices() {
|
||||
gov.mutex.Lock()
|
||||
defer gov.mutex.Unlock()
|
||||
|
@ -202,7 +254,7 @@ func (gov *ChainGovernor) revertAllPrices() {
|
|||
}
|
||||
}
|
||||
|
||||
// We should use the max(coinGeckoPrice, configuredPrice) as our price for computing notional value.
|
||||
// updatePrice updates the price of a single token. We should use the max(coinGeckoPrice, configuredPrice) as our price for computing notional value.
|
||||
func (te tokenEntry) updatePrice() {
|
||||
if (te.coinGeckoPrice == nil) || (te.coinGeckoPrice.Cmp(te.cfgPrice) < 0) {
|
||||
te.price.Set(te.cfgPrice)
|
||||
|
@ -211,6 +263,7 @@ func (te tokenEntry) updatePrice() {
|
|||
}
|
||||
}
|
||||
|
||||
// CheckQuery is a free function used to test that the CoinGecko query still works after the mainnet token list has been updated.
|
||||
func CheckQuery(logger *zap.Logger) error {
|
||||
logger.Info("Instantiating governor.")
|
||||
ctx := context.Background()
|
||||
|
@ -221,16 +274,16 @@ func CheckQuery(logger *zap.Logger) error {
|
|||
return err
|
||||
}
|
||||
|
||||
logger.Info("Building Coin Gecko query.")
|
||||
logger.Info("Building CoinGecko query.")
|
||||
if err := gov.initCoinGecko(ctx, false); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("Initiating Coin Gecko query.")
|
||||
logger.Info("Initiating CoinGecko query.")
|
||||
if err := gov.queryCoinGecko(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger.Info("Coin Gecko query complete.")
|
||||
logger.Info("CoinGecko query complete.")
|
||||
return nil
|
||||
}
|
||||
|
|
|
@ -6,6 +6,8 @@ import (
|
|||
"fmt"
|
||||
"math"
|
||||
"math/big"
|
||||
"net/url"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
|
@ -1719,3 +1721,73 @@ func TestReusedMsgIdWithDifferentPayloadGetsProcessed(t *testing.T) {
|
|||
assert.Equal(t, uint64(0), valuePending)
|
||||
assert.Equal(t, 2, len(gov.msgsSeen))
|
||||
}
|
||||
|
||||
func getIdsFromCoinGeckoQuery(t *testing.T, query string) []string {
|
||||
unescaped, err := url.QueryUnescape(query)
|
||||
require.NoError(t, err)
|
||||
|
||||
fields := strings.Split(unescaped, "?")
|
||||
require.Equal(t, 2, len(fields))
|
||||
|
||||
u, err := url.ParseQuery(fields[1])
|
||||
require.NoError(t, err)
|
||||
|
||||
idField, exists := u["ids"]
|
||||
require.Equal(t, true, exists)
|
||||
require.Equal(t, 1, len(idField))
|
||||
|
||||
return strings.Split(idField[0], ",")
|
||||
}
|
||||
|
||||
func TestCoinGeckoQueries(t *testing.T) {
|
||||
type testCase struct {
|
||||
desc string
|
||||
numIds int
|
||||
chunkSize int
|
||||
expectedQueries int
|
||||
}
|
||||
|
||||
tests := []testCase{
|
||||
{numIds: 0, chunkSize: 100, expectedQueries: 0, desc: "Zero queries"},
|
||||
{numIds: 42, chunkSize: 100, expectedQueries: 1, desc: "Easily fits in one"},
|
||||
{numIds: 100, chunkSize: 100, expectedQueries: 1, desc: "Exactly fits in one"},
|
||||
{numIds: 242, chunkSize: 207, expectedQueries: 2, desc: "Easily fits in two"},
|
||||
{numIds: 414, chunkSize: 207, expectedQueries: 2, desc: "Exactly fits in two"},
|
||||
{numIds: 5001, chunkSize: 207, expectedQueries: 25, desc: "A bunch of queries"},
|
||||
}
|
||||
|
||||
for _, tc := range tests {
|
||||
t.Run(tc.desc, func(t *testing.T) {
|
||||
ids := make([]string, tc.numIds)
|
||||
for idx := 0; idx < tc.numIds; idx++ {
|
||||
ids[idx] = fmt.Sprintf("id%d", idx)
|
||||
}
|
||||
|
||||
queries := createCoinGeckoQueries(ids, tc.chunkSize)
|
||||
require.Equal(t, tc.expectedQueries, len(queries))
|
||||
|
||||
results := make(map[string]string)
|
||||
for _, query := range queries {
|
||||
idsInQuery := getIdsFromCoinGeckoQuery(t, query)
|
||||
require.GreaterOrEqual(t, tc.chunkSize, len(idsInQuery))
|
||||
for _, id := range idsInQuery {
|
||||
results[id] = id
|
||||
}
|
||||
}
|
||||
|
||||
require.Equal(t, tc.numIds, len(results))
|
||||
|
||||
for _, id := range ids {
|
||||
if _, exists := results[id]; !exists {
|
||||
assert.Equal(t, "id not found in query", id)
|
||||
}
|
||||
delete(results, id)
|
||||
}
|
||||
if len(results) != 0 {
|
||||
for id := range results {
|
||||
assert.Equal(t, "bogus id created by query", id)
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
|
Loading…
Reference in New Issue