pyth-crosschain/event_database/cloud_functions/addresses-transferred-to.go

504 lines
16 KiB
Go

// Package p contains an HTTP Cloud Function.
package p
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strconv"
"sync"
"time"
"cloud.google.com/go/bigtable"
)
type addressesResult struct {
Last24HoursAmounts map[string]map[string]float64
Last24HoursCounts map[string]int
WithinPeriodAmounts map[string]map[string]float64
WithinPeriodCounts map[string]int
PeriodDurationDays int
DailyAmounts map[string]map[string]map[string]float64
DailyCounts map[string]map[string]int
}
// an in-memory cache of previously calculated results
var warmAddressesCache = map[string]map[string]map[string]map[string]float64{}
var muWarmAddressesCache sync.RWMutex
var warmAddressesCacheFilePath = "/addresses-transferred-to-cache.json"
type AddressData struct {
TokenSymbol string
TokenAmount float64
OriginChain string
LeavingChain string
DestinationChain string
DestinationAddress string
Notional float64
}
func fetchAddressRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) []AddressData {
rows := []AddressData{}
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
t := &AddressData{}
if _, ok := row[transferDetailsFam]; ok {
for _, item := range row[transferDetailsFam] {
switch item.Column {
case "TokenTransferDetails:Amount":
amount, _ := strconv.ParseFloat(string(item.Value), 64)
t.TokenAmount = amount
case "TokenTransferDetails:NotionalUSD":
reader := bytes.NewReader(item.Value)
var notionalFloat float64
if err := binary.Read(reader, binary.BigEndian, &notionalFloat); err != nil {
log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
}
t.Notional = notionalFloat
case "TokenTransferDetails:OriginSymbol":
t.TokenSymbol = string(item.Value)
}
}
if _, ok := row[transferPayloadFam]; ok {
for _, item := range row[transferPayloadFam] {
switch item.Column {
case "TokenTransferPayload:OriginChain":
t.OriginChain = string(item.Value)
case "TokenTransferPayload:TargetChain":
t.DestinationChain = string(item.Value)
case "TokenTransferPayload:TargetAddress":
t.DestinationAddress = string(item.Value)
}
}
t.DestinationAddress = transformHexAddressToNative(chainIdStringToType(t.DestinationChain), t.DestinationAddress)
}
t.LeavingChain = row.Key()[:1]
rows = append(rows, *t)
}
return true
}, bigtable.RowFilter(
bigtable.ConditionFilter(
bigtable.ChainFilters(
bigtable.FamilyFilter(columnFamilies[1]),
bigtable.CellsPerRowLimitFilter(1), // only the first cell in column
bigtable.TimestampRangeFilter(start, end), // within time range
bigtable.StripValueFilter(), // no columns/values, just the row.Key()
),
bigtable.ChainFilters(
bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])),
bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginChain|TargetChain|TargetAddress"),
bigtable.LatestNFilter(1),
),
bigtable.BlockAllFilter(),
),
))
if err != nil {
log.Fatalln("failed reading rows to create RowList.", err)
}
return rows
}
// finds unique addresses tokens have been sent to, for each day since the start time passed in.
func createAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
results := map[string]map[string]map[string]float64{}
now := time.Now().UTC()
numPrevDays := int(now.Sub(start).Hours() / 24)
var intervalsWG sync.WaitGroup
// there will be a query for each previous day, plus today
intervalsWG.Add(numPrevDays + 1)
// create the unique identifier for this query, for cache
cachePrefix := createCachePrefix(prefix)
cacheNeedsUpdate := false
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
// start is the SOD, end is EOD
// "0 daysAgo start" is 00:00:00 AM of the current day
// "0 daysAgo end" is 23:59:59 of the current day (the future)
// calulate the start and end times for the query
hoursAgo := (24 * daysAgo)
daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
n := now.Add(daysAgoDuration)
year := n.Year()
month := n.Month()
day := n.Day()
loc := n.Location()
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
dateStr := start.Format("2006-01-02")
muWarmAddressesCache.Lock()
// initialize the map for this date in the result set
results[dateStr] = map[string]map[string]float64{}
// check to see if there is cache data for this date/query
if dates, ok := warmAddressesCache[cachePrefix]; ok {
// have a cache for this query
if dateCache, ok := dates[dateStr]; ok {
// have a cache for this date
if daysAgo >= 1 {
// only use the cache for yesterday and older
results[dateStr] = dateCache
muWarmAddressesCache.Unlock()
intervalsWG.Done()
return
}
}
} else {
// no cache for this query, initialize the map
warmAddressesCache[cachePrefix] = map[string]map[string]map[string]float64{}
}
muWarmAddressesCache.Unlock()
defer intervalsWG.Done()
queryResult := fetchAddressRowsInInterval(tbl, ctx, prefix, start, end)
// iterate through the rows and increment the count
for _, row := range queryResult {
if _, ok := results[dateStr][row.DestinationChain]; !ok {
results[dateStr][row.DestinationChain] = map[string]float64{}
}
results[dateStr][row.DestinationChain][row.DestinationAddress] = results[dateStr][row.DestinationChain][row.DestinationAddress] + row.Notional
}
if daysAgo >= 1 {
// set the result in the cache
muWarmAddressesCache.Lock()
if _, ok := warmAddressesCache[cachePrefix][dateStr]; !ok {
// cache does not have this date, persist it for other instances.
warmAddressesCache[cachePrefix][dateStr] = results[dateStr]
cacheNeedsUpdate = true
}
muWarmAddressesCache.Unlock()
}
}(tbl, ctx, prefix, daysAgo)
}
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(warmAddressesCacheFilePath, &muWarmAddressesCache, warmAddressesCache)
}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same keys
seenChainSet := map[string]bool{}
for _, chains := range results {
for leaving := range chains {
seenChainSet[leaving] = true
}
}
// ensure each chain object has all the same symbol keys:
for date := range results {
for chain := range seenChainSet {
// check that date has all the chains
if _, ok := results[date][chain]; !ok {
results[date][chain] = map[string]float64{}
}
}
}
return results
}
// finds all the unique addresses that have received tokens since a particular moment.
func addressesTransferredToSinceDate(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
result := map[string]map[string]float64{}
// fetch data for days not in the cache
dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, start)
// loop through the query results to combine cache + fresh data
for _, chains := range dailyAddresses {
for chain, addresses := range chains {
// ensure the chain exists in the result map
if _, ok := result[chain]; !ok {
result[chain] = map[string]float64{}
}
for address, amount := range addresses {
if _, ok := result[chain][address]; !ok {
result[chain][address] = 0
}
// add the amount the address received this day to the
// amount already in the result (amount the address has recieved so far)
result[chain][address] = result[chain][address] + amount
}
}
}
return result
}
// returns addresses that received tokens within the specified time range
func addressesForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) map[string]map[string]float64 {
// query for all rows in time range, return result count
queryResult := fetchAddressRowsInInterval(tbl, ctx, prefix, start, end)
result := map[string]map[string]float64{}
// iterate through the rows and increment the count for each index
for _, row := range queryResult {
if _, ok := result[row.DestinationChain]; !ok {
result[row.DestinationChain] = map[string]float64{}
}
result[row.DestinationChain][row.DestinationAddress] = result[row.DestinationChain][row.DestinationAddress] + row.Notional
}
return result
}
// find the addresses tokens have been transferred to
func AddressesTransferredTo(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numDays, forChain, forAddress, daily, last24Hours, forPeriod, counts, amounts string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
numDays = queryParams.Get("numDays")
forChain = queryParams.Get("forChain")
forAddress = queryParams.Get("forAddress")
daily = queryParams.Get("daily")
last24Hours = queryParams.Get("last24Hours")
forPeriod = queryParams.Get("forPeriod")
counts = queryParams.Get("counts")
amounts = queryParams.Get("amounts")
case http.MethodPost:
// declare request body properties
var d struct {
NumDays string `json:"numDays"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
Daily string `json:"daily"`
Last24Hours string `json:"last24Hours"`
ForPeriod string `json:"forPeriod"`
Counts string `json:"counts"`
Amounts string `json:"amounts"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
numDays = d.NumDays
forChain = d.ForChain
forAddress = d.ForAddress
daily = d.Daily
last24Hours = d.Last24Hours
forPeriod = d.ForPeriod
counts = d.Counts
amounts = d.Amounts
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
if daily == "" && last24Hours == "" && forPeriod == "" {
// none of the options were set, so set one
last24Hours = "true"
}
if counts == "" && amounts == "" {
// neither of the options were set, so set one
counts = "true"
}
var queryDays int
if numDays == "" {
queryDays = 30
} else {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
fmt.Fprint(w, "numDays must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
// create the rowkey prefix for querying
prefix := ""
if forChain != "" {
prefix = forChain
// if the request is forChain, always groupBy chain
if forAddress != "" {
// if the request is forAddress, always groupBy address
prefix = forChain + ":" + forAddress
}
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup
// total of last 24 hours
last24HourAmounts := map[string]map[string]float64{}
last24HourCounts := map[string]int{}
if last24Hours != "" {
wg.Add(1)
go func(prefix string) {
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourAddresses := addressesForInterval(tbl, ctx, prefix, start, now)
if amounts != "" {
for chain, addresses := range last24HourAddresses {
last24HourAmounts[chain] = map[string]float64{}
for address, amount := range addresses {
last24HourAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
if counts != "" {
for chain, addresses := range last24HourAddresses {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
last24HourCounts[chain] = numAddresses
last24HourCounts["*"] = last24HourCounts["*"] + numAddresses
}
}
}(prefix)
}
// total of the last numDays
addressesDailyAmounts := map[string]map[string]float64{}
addressesDailyCounts := map[string]int{}
if forPeriod != "" {
wg.Add(1)
go func(prefix string) {
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
// periodAmounts, err := addressesTransferredToSince(tbl, ctx, prefix, start)
periodAmounts := addressesTransferredToSinceDate(tbl, ctx, prefix, start)
if amounts != "" {
for chain, addresses := range periodAmounts {
addressesDailyAmounts[chain] = map[string]float64{}
for address, amount := range addresses {
addressesDailyAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
if counts != "" {
for chain, addresses := range periodAmounts {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
addressesDailyCounts[chain] = numAddresses
addressesDailyCounts["*"] = addressesDailyCounts["*"] + numAddresses
}
}
}(prefix)
}
// daily totals
dailyAmounts := map[string]map[string]map[string]float64{}
dailyCounts := map[string]map[string]int{}
if daily != "" {
wg.Add(1)
go func(prefix string, queryDays int) {
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
dailyTotals := createAddressesOfInterval(tbl, ctx, prefix, start)
if amounts != "" {
for date, chains := range dailyTotals {
dailyAmounts[date] = map[string]map[string]float64{}
for chain, addresses := range chains {
dailyAmounts[date][chain] = map[string]float64{}
for address, amount := range addresses {
dailyAmounts[date][chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
}
if counts != "" {
for date, chains := range dailyTotals {
dailyCounts[date] = map[string]int{}
for chain, addresses := range chains {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
dailyCounts[date][chain] = numAddresses
dailyCounts[date]["*"] = dailyCounts[date]["*"] + numAddresses
}
}
}
}(prefix, queryDays)
}
wg.Wait()
result := &addressesResult{
Last24HoursAmounts: last24HourAmounts,
Last24HoursCounts: last24HourCounts,
WithinPeriodAmounts: addressesDailyAmounts,
WithinPeriodCounts: addressesDailyCounts,
PeriodDurationDays: queryDays,
DailyAmounts: dailyAmounts,
DailyCounts: dailyCounts,
}
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}