some tweaks to prices.go (GetZECPrice, GetCurrentZECPrice)

Simplified locking; there is no need for a read-write mutex, this code
is not performance-critical. Removed some of the go routines because
they're not needed and the locking and concurrency are easier to reason
about without them.

NOTE: there is some test code left in here, search for "XXX testing" and
remove before committing! This test code makes things happen faster:
Instead of fetching prices every 15 minutes, do it every minute. Also,
write historical data every minute, instead of once per day.

Another change needed is to remove some of the logging. It's good for
now during testing, but it's too much for production.
This commit is contained in:
Larry Ruane 2022-04-25 21:04:05 -06:00
parent 5e4e4272ab
commit 2dd37cf119
5 changed files with 115 additions and 152 deletions

View File

@ -21,17 +21,17 @@ import (
)
var (
// Map of all historical prices. Date as "yyyy-mm-dd" to price in cents
// Map of all historical prices. Date as "yyyy-mm-dd" to price in USD
historicalPrices map[string]float64 = make(map[string]float64)
// The latest price
latestPrice float64 = -1
// Latest price was fetched at
latestPriceAt time.Time
latestPriceTime time.Time
// Mutex to control both historical and latest price
pricesRwMutex sync.RWMutex
// Mutex to guard both historical and latest price
mutex sync.Mutex
// Full path of the persistence file
pricesFileName string
@ -78,7 +78,6 @@ func fetchAPIPrice(url string, resultPath []string) (float64, error) {
func fetchCoinbasePrice() (float64, error) {
return fetchAPIPrice("https://api.coinbase.com/v2/exchange-rates?currency=ZEC", []string{"data", "rates", "USD"})
}
func fetchCoinCapPrice() (float64, error) {
@ -89,98 +88,86 @@ func fetchBinancePrice() (float64, error) {
return fetchAPIPrice("https://api.binance.com/api/v3/avgPrice?symbol=ZECUSDC", []string{"price"})
}
func fetchHistoricalCoingeckoPrice(ts *time.Time) (float64, error) {
func fetchHistoricalCoingeckoPrice(ts time.Time) (float64, error) {
dt := ts.Format("02-01-2006") // dd-mm-yyyy
url := fmt.Sprintf("https://api.coingecko.com/api/v3/coins/zcash/history?date=%s?id=zcash", dt)
url := fmt.Sprintf("https://api.coingecko.com/api/v3/coins/zcash/history?date=%s", dt)
return fetchAPIPrice(url, []string{"market_data", "current_price", "usd"})
}
// Median gets the median number in a slice of numbers
func median(inp []float64) (median float64) {
// Start by sorting a copy of the slice
sort.Float64s(inp)
// No math is needed if there are no numbers
// calcMedian calculates the median of a sorted slice of numbers
func calcMedian(inp []float64) (median float64) {
// For even numbers we add the two middle numbers
// and divide by two using the mean function above
// For odd numbers we just use the middle number
l := len(inp)
if l == 0 {
return -1
} else if l%2 == 0 {
return (inp[l/2-1] + inp[l/2]) / 2
n := len(inp)
if n%2 == 0 {
return (inp[n/2-1] + inp[n/2]) / 2
} else {
return inp[l/2]
return inp[n/2]
}
}
// fetchPriceFromWebAPI will fetch prices from multiple places, discard outliers and return the
// concensus price
// concensus price. This function doesn't need the mutex.
func fetchPriceFromWebAPI() (float64, error) {
// We'll fetch prices from all our endpoints, and use the median price from that
priceProviders := []func() (float64, error){fetchBinancePrice, fetchCoinCapPrice, fetchCoinbasePrice}
ch := make(chan float64)
// Get all prices
for _, provider := range priceProviders {
go func(provider func() (float64, error)) {
price, err := provider()
if err != nil {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"error": err,
}).Error("Service")
ch <- -1
} else {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"price": price,
}).Info("Service")
ch <- price
}
}(provider)
}
prices := make([]float64, 0)
for range priceProviders {
price := <-ch
if price > 0 {
for _, provider := range priceProviders {
price, err := provider()
if err == nil {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"price": price,
}).Info("Service")
prices = append(prices, price)
} else {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"provider": runtime.FuncForPC(reflect.ValueOf(provider).Pointer()).Name(),
"error": err,
}).Error("Service")
}
}
// sort
if len(prices) == 0 {
return -1, errors.New("no price providers are available")
}
sort.Float64s(prices)
// Get the median price
median1 := median(prices)
median := calcMedian(prices)
// Discard all values that are more than 20% outside the median
validPrices := make([]float64, 0)
for _, price := range prices {
if (math.Abs(price-median1) / median1) > 0.2 {
if (math.Abs(price-median) / median) > 0.2 {
Log.WithFields(logrus.Fields{
"method": "CurrentPrice",
"error": fmt.Sprintf("Discarding price (%.2f) because too far away from median (%.2f", price, median1),
"error": fmt.Sprintf("Discarding price (%.2f) because too far away from median (%.2f", price, median),
}).Error("Service")
} else {
validPrices = append(validPrices, price)
}
}
// At least 2 (valid) providers are required; we don't want to depend on just one
if len(validPrices) < 2 {
return -1, errors.New("insufficient price providers are available")
}
// If we discarded too many, return an error
if len(validPrices) < (len(prices)/2 + 1) {
return -1, errors.New("not enough valid prices")
} else {
return median(validPrices), nil
}
median = calcMedian(validPrices)
if median <= 0 {
return -1, errors.New("median price is <= 0")
}
return median, nil
}
func readHistoricalPricesFile() (map[string]float64, error) {
@ -217,132 +204,98 @@ func writeHistoricalPricesMap() {
defer f.Close()
j := gob.NewEncoder(f)
{
// Read lock
pricesRwMutex.RLock()
err = j.Encode(historicalPrices)
pricesRwMutex.RUnlock()
if err != nil {
Log.Errorf("Couldn't encode historical prices: %v", err)
return
}
if err = j.Encode(historicalPrices); err != nil {
Log.Errorf("Couldn't encode historical prices: %v", err)
return
}
Log.WithFields(logrus.Fields{
"method": "HistoricalPrice",
"action": "Wrote historical prices file",
}).Info("Service")
}
// GetCurrentPrice is a top-level API, returns the latest price that we
// have fetched if no more than 3 hours old, else an error. An error
// should not occur unless we can't reach enough price oracles.
func GetCurrentPrice() (float64, error) {
// Read lock
pricesRwMutex.RLock()
defer pricesRwMutex.RUnlock()
mutex.Lock()
defer mutex.Unlock()
if latestPriceTime.IsZero() {
return -1, errors.New("starting up, prices not available yet")
}
// If the current price is too old, don't return it.
if time.Since(latestPriceAt).Hours() > 3 {
if time.Since(latestPriceTime).Hours() > 3 {
return -1, errors.New("price too old")
}
return latestPrice, nil
}
func writeLatestPrice(price float64) {
{
// Read lock
pricesRwMutex.RLock()
// Check if the time has "rolled over", if yes then preserve the last price
// as the previous day's historical price
if latestPrice > 0 && latestPriceAt.Format("2006-01-02") != time.Now().Format("2006-01-02") {
// update the historical price.
// First, make a copy of the time
t := time.Unix(latestPriceAt.Unix(), 0)
go addHistoricalPrice(latestPrice, &t)
}
pricesRwMutex.RUnlock()
}
// Write lock
pricesRwMutex.Lock()
latestPrice = price
latestPriceAt = time.Now()
pricesRwMutex.Unlock()
// return the time in YYYY-MM-DD string format
func day(t time.Time) string {
return t.Format("2006-01-02")
}
func GetHistoricalPrice(ts *time.Time) (float64, *time.Time, error) {
dt := ts.Format("2006-01-02")
// GetHistoricalPrice returns the price for the given day, but only
// accurate to day granularity.
func GetHistoricalPrice(ts time.Time) (float64, *time.Time, error) {
dt := day(ts)
canonicalTime, err := time.Parse("2006-01-02", dt)
if err != nil {
return -1, nil, err
}
{
// Read lock
pricesRwMutex.RLock()
val, ok := historicalPrices[dt]
pricesRwMutex.RUnlock()
if ok {
return val, &canonicalTime, nil
}
mutex.Lock()
defer mutex.Unlock()
if val, ok := historicalPrices[dt]; ok {
return val, &canonicalTime, nil
}
{
// Check if this is the same as the current latest price
// Read lock
pricesRwMutex.RLock()
var price = latestPrice
var returnPrice = price > 0 && latestPriceAt.Format("2006-01-02") == dt
pricesRwMutex.RUnlock()
if returnPrice {
return price, &canonicalTime, nil
}
// Check if this is the same as the current latest price
if latestPrice > 0 && day(latestPriceTime) == dt {
return latestPrice, &canonicalTime, nil
}
// Fetch price from web API
mutex.Unlock()
price, err := fetchHistoricalCoingeckoPrice(ts)
mutex.Lock()
if err != nil {
Log.Errorf("Couldn't read historical prices from Coingecko: %v", err)
return -1, nil, err
}
if price <= 0 {
Log.Errorf("historical prices from Coingecko <= 0")
return -1, nil, errors.New("bad Coingecko result")
}
// add to our cache so we don't have to hit Coingecko again
// for the same date
addHistoricalPrice(price, ts)
go addHistoricalPrice(price, ts)
return price, &canonicalTime, err
return price, &canonicalTime, nil
}
func addHistoricalPrice(price float64, ts *time.Time) {
if price <= 0 {
return
}
dt := ts.Format("2006-01-02")
// Read Lock
pricesRwMutex.RLock()
_, ok := historicalPrices[dt]
pricesRwMutex.RUnlock()
if !ok {
// Write lock
pricesRwMutex.Lock()
// Add a price entry for the given day both to our map
// and to the file (so we'll have it after a restart).
// This caching allows us to hit coingecko less often,
// and provides resilience when that site is down.
//
// There are two ways a historical price can be added:
// - When a client calls GetZECPrice to get a past price
// - When a new day begins, we'll save the previous day's price
//
func addHistoricalPrice(price float64, ts time.Time) {
dt := day(ts)
if _, ok := historicalPrices[dt]; !ok {
// an entry for this day doesn't exist, add it
historicalPrices[dt] = price
defer pricesRwMutex.Unlock()
go Log.WithFields(logrus.Fields{
Log.WithFields(logrus.Fields{
"method": "HistoricalPrice",
"action": "Add",
"date": dt,
"price": price,
}).Info("Service")
go writeHistoricalPricesMap()
writeHistoricalPricesMap()
}
}
@ -352,14 +305,14 @@ func StartPriceFetcher(dbPath string, chainName string) {
pricesFileName = filepath.Join(dbPath, chainName, "prices")
// Read the historical prices if available
mutex.Lock()
if prices, err := readHistoricalPricesFile(); err != nil {
Log.Errorf("Couldn't read historical prices, starting with empty map")
} else {
// Write lock
pricesRwMutex.Lock()
historicalPrices = prices
pricesRwMutex.Unlock()
Log.Infof("prices at start: %v", prices)
}
mutex.Unlock()
// Fetch the current price every 15 mins
go func() {
@ -373,10 +326,20 @@ func StartPriceFetcher(dbPath string, chainName string) {
"price": price,
}).Info("Service")
writeLatestPrice(price)
mutex.Lock()
// If the day has changed, save the previous day's
// historical price. Historical prices are per-day.
if day(latestPriceTime) != day(time.Now()) {
if latestPrice > 0 {
t := time.Unix(latestPriceTime.Unix(), 0)
addHistoricalPrice(latestPrice, t)
}
}
latestPrice = price
latestPriceTime = time.Now()
mutex.Unlock()
}
// Refresh every
// price data 15 minutes out of date is probably okay
time.Sleep(15 * time.Minute)
}
}()

View File

@ -1404,7 +1404,7 @@ There is no staging or applying for these, very simple.</p></td>
<td>timestamp</td>
<td><a href="#uint64">uint64</a></td>
<td></td>
<td><p>List of timestamps(in sec) at which the price is being requested </p></td>
<td><p>timestamp (Unix time, seconds since 1970) at which the price is being requested </p></td>
</tr>
<tr>

View File

@ -183,7 +183,7 @@ func (s *lwdStreamer) GetZECPrice(ctx context.Context, in *walletrpc.PriceReques
}
ts := time.Unix(int64(in.Timestamp), 0)
price, timeFetched, err := common.GetHistoricalPrice(&ts)
price, timeFetched, err := common.GetHistoricalPrice(ts)
if err != nil {
return nil, err

View File

@ -1196,7 +1196,7 @@ type PriceRequest struct {
sizeCache protoimpl.SizeCache
unknownFields protoimpl.UnknownFields
// List of timestamps(in sec) at which the price is being requested
// timestamp (Unix time, seconds since 1970) at which the price is being requested
Timestamp uint64 `protobuf:"varint,1,opt,name=timestamp,proto3" json:"timestamp,omitempty"`
// 3 letter currency-code
Currency string `protobuf:"bytes,2,opt,name=currency,proto3" json:"currency,omitempty"`

View File

@ -138,7 +138,7 @@ message GetAddressUtxosReplyList {
}
message PriceRequest {
// List of timestamps(in sec) at which the price is being requested
// timestamp (Unix time, seconds since 1970) at which the price is being requested
uint64 timestamp = 1;
// 3 letter currency-code