BigTable: flat file cache + cumulative endpoints

Change-Id: If7e631b33866f077c60e0d397b02bfca7e91b05d

commit-id:89bb033e
This commit is contained in:
justinschuldt 2021-12-13 00:09:03 -06:00 committed by Leopold Schabel
parent 6583771813
commit fa1252ace4
10 changed files with 1733 additions and 310 deletions

View File

@ -2,8 +2,12 @@
.git
.gitignore
.vscode
cmd
*.md
*.txt
Dockerfile

View File

@ -0,0 +1,394 @@
// Package p contains an HTTP Cloud Function.
package p
import (
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sort"
"strconv"
"sync"
"time"
"cloud.google.com/go/bigtable"
)
type cumulativeAddressesResult struct {
AllTimeAmounts map[string]map[string]float64
AllTimeCounts map[string]int
AllTimeDurationDays int
DailyAmounts map[string]map[string]map[string]float64
DailyCounts map[string]map[string]int
}
// an in-memory cache of previously calculated results
var warmCumulativeAddressesCache = map[string]map[string]map[string]map[string]float64{}
var muWarmCumulativeAddressesCache sync.RWMutex
var warmCumulativeAddressesCacheFilePath = "/addresses-transferred-to-cumulative-cache.json"
var addressesToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
var muAddressesToUpToYesterday sync.RWMutex
var addressesToUpToYesterdayFilePath = "/addresses-transferred-to-up-to-yesterday-cache.json"
// finds all the unique addresses that have received tokens since a particular moment.
func addressesTransferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
now := time.Now().UTC()
today := now.Format("2006-01-02")
oneDayAgo := -time.Duration(24) * time.Hour
yesterday := now.Add(oneDayAgo).Format("2006-01-02")
result := map[string]map[string]float64{}
// create the unique identifier for this query, for cache
cachePrefix := createCachePrefix(prefix)
muAddressesToUpToYesterday.Lock()
if _, ok := addressesToUpToYesterday[cachePrefix]; !ok {
addressesToUpToYesterday[cachePrefix] = map[string]map[string]map[string]float64{}
}
if cacheData, ok := addressesToUpToYesterday[cachePrefix][yesterday]; ok {
// cache has data through midnight yesterday
result = cacheData
// set the start to be the start of today
start = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
}
muAddressesToUpToYesterday.Unlock()
// fetch data for days not in the cache
dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, start)
// loop through the query results to combine cache + fresh data
for _, chains := range dailyAddresses {
for chain, addresses := range chains {
// ensure the chain exists in the result map
if _, ok := result[chain]; !ok {
result[chain] = map[string]float64{}
}
for address, amount := range addresses {
if _, ok := result[chain][address]; !ok {
result[chain][address] = 0
}
// add the amount the address received this day to the
// amount already in the result (amount the address has recieved so far)
result[chain][address] = result[chain][address] + amount
}
}
}
muAddressesToUpToYesterday.Lock()
if _, ok := addressesToUpToYesterday[cachePrefix][yesterday]; !ok {
// no cache, populate it
upToYesterday := result
for chain, addresses := range dailyAddresses[today] {
for address, amount := range addresses {
upToYesterday[chain][address] = upToYesterday[chain][address] - amount
}
}
addressesToUpToYesterday[cachePrefix][yesterday] = upToYesterday
muAddressesToUpToYesterday.Unlock()
// write cache to disc
persistInterfaceToJson(addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, addressesToUpToYesterday)
} else {
muAddressesToUpToYesterday.Unlock()
}
return result
}
// calcuates a map of recepient address to notional value received, by chain, since the start time specified.
func createCumulativeAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
now := time.Now().UTC()
today := now.Format("2006-01-02")
cachePrefix := createCachePrefix(prefix)
cacheNeedsUpdate := false
muWarmCumulativeAddressesCache.Lock()
if _, ok := warmCumulativeAddressesCache[cachePrefix]; !ok {
warmCumulativeAddressesCache[cachePrefix] = map[string]map[string]map[string]float64{}
}
muWarmCumulativeAddressesCache.Unlock()
results := map[string]map[string]map[string]float64{}
dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, releaseDay)
dateKeys := make([]string, 0, len(dailyAddresses))
for k := range dailyAddresses {
dateKeys = append(dateKeys, k)
}
sort.Strings(dateKeys)
// iterate through the dates in the result set, and accumulate the amounts
// of each token transfer by symbol, based on the destination of the transfer.
for i, date := range dateKeys {
muWarmCumulativeAddressesCache.RLock()
if dateCache, ok := warmCumulativeAddressesCache[cachePrefix][date]; ok && dateCache != nil {
// have a cached value for this day, use it.
results[date] = dateCache
muWarmCumulativeAddressesCache.RUnlock()
} else {
// no cached value for this day, must calculate it
muWarmCumulativeAddressesCache.RUnlock()
if i == 0 {
// special case for first day, no need to sum.
results[date] = dailyAddresses[date]
} else {
results[date] = map[string]map[string]float64{}
// find the string of the previous day
prevDate := dateKeys[i-1]
prevDayChains := results[prevDate]
thisDayChains := dailyAddresses[date]
for chain, thisDayAddresses := range thisDayChains {
// create a union of the addresses from this day, and previous days
addressUnion := map[string]string{}
for address := range prevDayChains[chain] {
addressUnion[address] = address
}
for address := range thisDayAddresses {
addressUnion[address] = address
}
// initalize the chain/symbol map for this date
if _, ok := results[date][chain]; !ok {
results[date][chain] = map[string]float64{}
}
// iterate through the union of addresses, creating an amount for each one,
// and adding it the the results.
for address := range addressUnion {
thisDayAmount := float64(0)
if amt, ok := thisDayAddresses[address]; ok {
thisDayAmount = amt
}
prevDayAmount := float64(0)
if prevAmount, ok := results[prevDate][chain][address]; ok && prevAmount != 0 {
prevDayAmount = prevAmount
}
cumulativeAmount := prevDayAmount + thisDayAmount
results[date][chain][address] = cumulativeAmount
}
}
}
// dont cache today
if date != today {
// set the result in the cache
muWarmCumulativeAddressesCache.Lock()
if _, ok := warmCumulativeAddressesCache[cachePrefix][date]; !ok {
// cache does not have this date, persist it for other instances.
warmCumulativeAddressesCache[cachePrefix][date] = results[date]
cacheNeedsUpdate = true
}
muWarmCumulativeAddressesCache.Unlock()
}
}
}
if cacheNeedsUpdate {
persistInterfaceToJson(warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, warmCumulativeAddressesCache)
}
selectDays := map[string]map[string]map[string]float64{}
days := getDaysInRange(start, now)
for _, day := range days {
selectDays[day] = results[day]
}
return selectDays
}
// finds unique addresses that tokens have been transferred to.
func AddressesTransferredToCumulative(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numDays, forChain, forAddress, daily, allTime, counts, amounts string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
numDays = queryParams.Get("numDays")
forChain = queryParams.Get("forChain")
forAddress = queryParams.Get("forAddress")
daily = queryParams.Get("daily")
allTime = queryParams.Get("allTime")
counts = queryParams.Get("counts")
amounts = queryParams.Get("amounts")
case http.MethodPost:
// declare request body properties
var d struct {
NumDays string `json:"numDays"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
Daily string `json:"daily"`
AllTime string `json:"allTime"`
Counts string `json:"counts"`
Amounts string `json:"amounts"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
numDays = d.NumDays
forChain = d.ForChain
forAddress = d.ForAddress
daily = d.Daily
allTime = d.AllTime
counts = d.Counts
amounts = d.Amounts
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
if daily == "" && allTime == "" {
// none of the options were set, so set one
allTime = "true"
}
if counts == "" && amounts == "" {
// neither of the options were set, so set one
counts = "true"
}
var queryDays int
if numDays == "" {
queryDays = 30
} else {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
fmt.Fprint(w, "numDays must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
// create the rowkey prefix for querying
prefix := ""
if forChain != "" {
prefix = forChain
// if the request is forChain, always groupBy chain
if forAddress != "" {
// if the request is forAddress, always groupBy address
prefix = forChain + ":" + forAddress
}
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup
// total of the last numDays
addressesDailyAmounts := map[string]map[string]float64{}
addressesDailyCounts := map[string]int{}
allTimeDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
if allTime != "" {
wg.Add(1)
go func(prefix string) {
defer wg.Done()
periodAmounts := addressesTransferredToSince(tbl, ctx, prefix, releaseDay)
if amounts != "" {
for chain, addresses := range periodAmounts {
addressesDailyAmounts[chain] = map[string]float64{}
for address, amount := range addresses {
addressesDailyAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
if counts != "" {
for chain, addresses := range periodAmounts {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
addressesDailyCounts[chain] = len(addresses)
addressesDailyCounts["*"] = addressesDailyCounts["*"] + numAddresses
}
}
}(prefix)
}
// daily totals
dailyAmounts := map[string]map[string]map[string]float64{}
dailyCounts := map[string]map[string]int{}
if daily != "" {
wg.Add(1)
go func(prefix string, queryDays int) {
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
dailyTotals := createCumulativeAddressesOfInterval(tbl, ctx, prefix, start)
if amounts != "" {
for date, chains := range dailyTotals {
dailyAmounts[date] = map[string]map[string]float64{}
for chain, addresses := range chains {
dailyAmounts[date][chain] = map[string]float64{}
for address, amount := range addresses {
dailyAmounts[date][chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
}
if counts != "" {
for date, chains := range dailyTotals {
dailyCounts[date] = map[string]int{}
for chain, addresses := range chains {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
dailyCounts[date][chain] = numAddresses
dailyCounts[date]["*"] = dailyCounts[date]["*"] + numAddresses
}
}
}
}(prefix, queryDays)
}
wg.Wait()
result := &cumulativeAddressesResult{
AllTimeAmounts: addressesDailyAmounts,
AllTimeCounts: addressesDailyCounts,
AllTimeDurationDays: allTimeDays,
DailyAmounts: dailyAmounts,
DailyCounts: dailyCounts,
}
jsonBytes, err := json.Marshal(result)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Println(err.Error())
return
}
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "application/json")
w.Write(jsonBytes)
}

View File

@ -0,0 +1,503 @@
// Package p contains an HTTP Cloud Function.
package p
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"strconv"
"sync"
"time"
"cloud.google.com/go/bigtable"
)
type addressesResult struct {
Last24HoursAmounts map[string]map[string]float64
Last24HoursCounts map[string]int
WithinPeriodAmounts map[string]map[string]float64
WithinPeriodCounts map[string]int
PeriodDurationDays int
DailyAmounts map[string]map[string]map[string]float64
DailyCounts map[string]map[string]int
}
// an in-memory cache of previously calculated results
var warmAddressesCache = map[string]map[string]map[string]map[string]float64{}
var muWarmAddressesCache sync.RWMutex
var warmAddressesCacheFilePath = "/addresses-transferred-to-cache.json"
type AddressData struct {
TokenSymbol string
TokenAmount float64
OriginChain string
LeavingChain string
DestinationChain string
DestinationAddress string
Notional float64
}
func fetchAddressRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) []AddressData {
rows := []AddressData{}
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
t := &AddressData{}
if _, ok := row[transferDetailsFam]; ok {
for _, item := range row[transferDetailsFam] {
switch item.Column {
case "TokenTransferDetails:Amount":
amount, _ := strconv.ParseFloat(string(item.Value), 64)
t.TokenAmount = amount
case "TokenTransferDetails:NotionalUSD":
reader := bytes.NewReader(item.Value)
var notionalFloat float64
if err := binary.Read(reader, binary.BigEndian, &notionalFloat); err != nil {
log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
}
t.Notional = notionalFloat
case "TokenTransferDetails:OriginSymbol":
t.TokenSymbol = string(item.Value)
}
}
if _, ok := row[transferPayloadFam]; ok {
for _, item := range row[transferPayloadFam] {
switch item.Column {
case "TokenTransferPayload:OriginChain":
t.OriginChain = string(item.Value)
case "TokenTransferPayload:TargetChain":
t.DestinationChain = string(item.Value)
case "TokenTransferPayload:TargetAddress":
t.DestinationAddress = string(item.Value)
}
}
t.DestinationAddress = transformHexAddressToNative(chainIdStringToType(t.DestinationChain), t.DestinationAddress)
}
t.LeavingChain = row.Key()[:1]
rows = append(rows, *t)
}
return true
}, bigtable.RowFilter(
bigtable.ConditionFilter(
bigtable.ChainFilters(
bigtable.FamilyFilter(columnFamilies[1]),
bigtable.CellsPerRowLimitFilter(1), // only the first cell in column
bigtable.TimestampRangeFilter(start, end), // within time range
bigtable.StripValueFilter(), // no columns/values, just the row.Key()
),
bigtable.ChainFilters(
bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])),
bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginChain|TargetChain|TargetAddress"),
bigtable.LatestNFilter(1),
),
bigtable.BlockAllFilter(),
),
))
if err != nil {
log.Fatalln("failed reading rows to create RowList.", err)
}
return rows
}
// finds unique addresses tokens have been sent to, for each day since the start time passed in.
func createAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
results := map[string]map[string]map[string]float64{}
now := time.Now().UTC()
numPrevDays := int(now.Sub(start).Hours() / 24)
var intervalsWG sync.WaitGroup
// there will be a query for each previous day, plus today
intervalsWG.Add(numPrevDays + 1)
// create the unique identifier for this query, for cache
cachePrefix := createCachePrefix(prefix)
cacheNeedsUpdate := false
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
// start is the SOD, end is EOD
// "0 daysAgo start" is 00:00:00 AM of the current day
// "0 daysAgo end" is 23:59:59 of the current day (the future)
// calulate the start and end times for the query
hoursAgo := (24 * daysAgo)
daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
n := now.Add(daysAgoDuration)
year := n.Year()
month := n.Month()
day := n.Day()
loc := n.Location()
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
dateStr := start.Format("2006-01-02")
muWarmAddressesCache.Lock()
// initialize the map for this date in the result set
results[dateStr] = map[string]map[string]float64{}
// check to see if there is cache data for this date/query
if dates, ok := warmAddressesCache[cachePrefix]; ok {
// have a cache for this query
if dateCache, ok := dates[dateStr]; ok {
// have a cache for this date
if daysAgo >= 1 {
// only use the cache for yesterday and older
results[dateStr] = dateCache
muWarmAddressesCache.Unlock()
intervalsWG.Done()
return
}
}
} else {
// no cache for this query, initialize the map
warmAddressesCache[cachePrefix] = map[string]map[string]map[string]float64{}
}
muWarmAddressesCache.Unlock()
defer intervalsWG.Done()
queryResult := fetchAddressRowsInInterval(tbl, ctx, prefix, start, end)
// iterate through the rows and increment the count
for _, row := range queryResult {
if _, ok := results[dateStr][row.DestinationChain]; !ok {
results[dateStr][row.DestinationChain] = map[string]float64{}
}
results[dateStr][row.DestinationChain][row.DestinationAddress] = results[dateStr][row.DestinationChain][row.DestinationAddress] + row.Notional
}
if daysAgo >= 1 {
// set the result in the cache
muWarmAddressesCache.Lock()
if _, ok := warmAddressesCache[cachePrefix][dateStr]; !ok {
// cache does not have this date, persist it for other instances.
warmAddressesCache[cachePrefix][dateStr] = results[dateStr]
cacheNeedsUpdate = true
}
muWarmAddressesCache.Unlock()
}
}(tbl, ctx, prefix, daysAgo)
}
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(warmAddressesCacheFilePath, &muWarmAddressesCache, warmAddressesCache)
}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same keys
seenChainSet := map[string]bool{}
for _, chains := range results {
for leaving := range chains {
seenChainSet[leaving] = true
}
}
// ensure each chain object has all the same symbol keys:
for date := range results {
for chain := range seenChainSet {
// check that date has all the chains
if _, ok := results[date][chain]; !ok {
results[date][chain] = map[string]float64{}
}
}
}
return results
}
// finds all the unique addresses that have received tokens since a particular moment.
func addressesTransferredToSinceDate(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
result := map[string]map[string]float64{}
// fetch data for days not in the cache
dailyAddresses := createAddressesOfInterval(tbl, ctx, prefix, start)
// loop through the query results to combine cache + fresh data
for _, chains := range dailyAddresses {
for chain, addresses := range chains {
// ensure the chain exists in the result map
if _, ok := result[chain]; !ok {
result[chain] = map[string]float64{}
}
for address, amount := range addresses {
if _, ok := result[chain][address]; !ok {
result[chain][address] = 0
}
// add the amount the address received this day to the
// amount already in the result (amount the address has recieved so far)
result[chain][address] = result[chain][address] + amount
}
}
}
return result
}
// returns addresses that received tokens within the specified time range
func addressesForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) map[string]map[string]float64 {
// query for all rows in time range, return result count
queryResult := fetchAddressRowsInInterval(tbl, ctx, prefix, start, end)
result := map[string]map[string]float64{}
// iterate through the rows and increment the count for each index
for _, row := range queryResult {
if _, ok := result[row.DestinationChain]; !ok {
result[row.DestinationChain] = map[string]float64{}
}
result[row.DestinationChain][row.DestinationAddress] = result[row.DestinationChain][row.DestinationAddress] + row.Notional
}
return result
}
// find the addresses tokens have been transferred to
func AddressesTransferredTo(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numDays, forChain, forAddress, daily, last24Hours, forPeriod, counts, amounts string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
numDays = queryParams.Get("numDays")
forChain = queryParams.Get("forChain")
forAddress = queryParams.Get("forAddress")
daily = queryParams.Get("daily")
last24Hours = queryParams.Get("last24Hours")
forPeriod = queryParams.Get("forPeriod")
counts = queryParams.Get("counts")
amounts = queryParams.Get("amounts")
case http.MethodPost:
// declare request body properties
var d struct {
NumDays string `json:"numDays"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
Daily string `json:"daily"`
Last24Hours string `json:"last24Hours"`
ForPeriod string `json:"forPeriod"`
Counts string `json:"counts"`
Amounts string `json:"amounts"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
numDays = d.NumDays
forChain = d.ForChain
forAddress = d.ForAddress
daily = d.Daily
last24Hours = d.Last24Hours
forPeriod = d.ForPeriod
counts = d.Counts
amounts = d.Amounts
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
if daily == "" && last24Hours == "" && forPeriod == "" {
// none of the options were set, so set one
last24Hours = "true"
}
if counts == "" && amounts == "" {
// neither of the options were set, so set one
counts = "true"
}
var queryDays int
if numDays == "" {
queryDays = 30
} else {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
fmt.Fprint(w, "numDays must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
// create the rowkey prefix for querying
prefix := ""
if forChain != "" {
prefix = forChain
// if the request is forChain, always groupBy chain
if forAddress != "" {
// if the request is forAddress, always groupBy address
prefix = forChain + ":" + forAddress
}
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup
// total of last 24 hours
last24HourAmounts := map[string]map[string]float64{}
last24HourCounts := map[string]int{}
if last24Hours != "" {
wg.Add(1)
go func(prefix string) {
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourAddresses := addressesForInterval(tbl, ctx, prefix, start, now)
if amounts != "" {
for chain, addresses := range last24HourAddresses {
last24HourAmounts[chain] = map[string]float64{}
for address, amount := range addresses {
last24HourAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
if counts != "" {
for chain, addresses := range last24HourAddresses {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
last24HourCounts[chain] = numAddresses
last24HourCounts["*"] = last24HourCounts["*"] + numAddresses
}
}
}(prefix)
}
// total of the last numDays
addressesDailyAmounts := map[string]map[string]float64{}
addressesDailyCounts := map[string]int{}
if forPeriod != "" {
wg.Add(1)
go func(prefix string) {
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
// periodAmounts, err := addressesTransferredToSince(tbl, ctx, prefix, start)
periodAmounts := addressesTransferredToSinceDate(tbl, ctx, prefix, start)
if amounts != "" {
for chain, addresses := range periodAmounts {
addressesDailyAmounts[chain] = map[string]float64{}
for address, amount := range addresses {
addressesDailyAmounts[chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
if counts != "" {
for chain, addresses := range periodAmounts {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
addressesDailyCounts[chain] = numAddresses
addressesDailyCounts["*"] = addressesDailyCounts["*"] + numAddresses
}
}
}(prefix)
}
// daily totals
dailyAmounts := map[string]map[string]map[string]float64{}
dailyCounts := map[string]map[string]int{}
if daily != "" {
wg.Add(1)
go func(prefix string, queryDays int) {
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
dailyTotals := createAddressesOfInterval(tbl, ctx, prefix, start)
if amounts != "" {
for date, chains := range dailyTotals {
dailyAmounts[date] = map[string]map[string]float64{}
for chain, addresses := range chains {
dailyAmounts[date][chain] = map[string]float64{}
for address, amount := range addresses {
dailyAmounts[date][chain][address] = roundToTwoDecimalPlaces(amount)
}
}
}
}
if counts != "" {
for date, chains := range dailyTotals {
dailyCounts[date] = map[string]int{}
for chain, addresses := range chains {
// need to sum all the chains to get the total count of addresses,
// since addresses are not unique across chains.
numAddresses := len(addresses)
dailyCounts[date][chain] = numAddresses
dailyCounts[date]["*"] = dailyCounts[date]["*"] + numAddresses
}
}
}
}(prefix, queryDays)
}
wg.Wait()
result := &addressesResult{
Last24HoursAmounts: last24HourAmounts,
Last24HoursCounts: last24HourCounts,
WithinPeriodAmounts: addressesDailyAmounts,
WithinPeriodCounts: addressesDailyCounts,
PeriodDurationDays: queryDays,
DailyAmounts: dailyAmounts,
DailyCounts: dailyCounts,
}
w.WriteHeader(http.StatusOK)
w.Header().Add("Content-Type", "application/json")
json.NewEncoder(w).Encode(result)
}

View File

@ -5,19 +5,13 @@ go 1.16
// cloud runtime is go 1.16. just for reference.
require (
cloud.google.com/go/bigtable v1.10.1
cloud.google.com/go/pubsub v1.3.1
github.com/GoogleCloudPlatform/functions-framework-go v1.3.0
cloud.google.com/go/bigtable v1.12.0
cloud.google.com/go/pubsub v1.17.1
github.com/GoogleCloudPlatform/functions-framework-go v1.5.2 // indirect
github.com/certusone/wormhole/node v0.0.0-20211115153408-0a93202f6e5d
github.com/cosmos/cosmos-sdk v0.44.0
github.com/gagliardetto/solana-go v1.0.2
github.com/holiman/uint256 v1.2.0
github.com/mattn/go-isatty v0.0.14 // indirect
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f // indirect
golang.org/x/sys v0.0.0-20210903071746-97244b99971b // indirect
google.golang.org/api v0.48.0 // indirect
google.golang.org/grpc v1.40.0 // indirect
google.golang.org/protobuf v1.27.1 // indirect
)
replace github.com/gogo/protobuf => github.com/regen-network/protobuf v1.3.3-alpha.regen.1

View File

@ -16,7 +16,6 @@ cloud.google.com/go v0.54.0/go.mod h1:1rq2OEkV3YMf6n/9ZvGWI3GWw0VoqH/1x2nd8Is/bP
cloud.google.com/go v0.56.0/go.mod h1:jr7tqZxxKOVYizybht9+26Z/gUq7tiRzu+ACVAMbKVk=
cloud.google.com/go v0.57.0/go.mod h1:oXiQ6Rzq3RAkkY7N6t3TcE6jE+CIBBbA36lwQ1JyzZs=
cloud.google.com/go v0.62.0/go.mod h1:jmCYTdRCQuc1PHIIJ/maLInMho30T/Y0M4hTdTShOYc=
cloud.google.com/go v0.63.0/go.mod h1:GmezbQc7T2snqkEXWfZ0sy0VfkB/ivI2DdtJL2DEmlg=
cloud.google.com/go v0.65.0/go.mod h1:O5N8zS7uWy9vkA9vayVHs65eM1ubvY4h553ofrNHObY=
cloud.google.com/go v0.72.0/go.mod h1:M+5Vjvlc2wnp6tjzE102Dw08nGShTscUx2nZMufOKPI=
cloud.google.com/go v0.74.0/go.mod h1:VV1xSbzvo+9QJOxLDaJfTjx5e+MePCpCWwvftOeQmWk=
@ -24,8 +23,14 @@ cloud.google.com/go v0.78.0/go.mod h1:QjdrLG0uq+YwhjoVOLsS1t7TW8fs36kLs4XO5R5ECH
cloud.google.com/go v0.79.0/go.mod h1:3bzgcEeQlzbuEAYu4mrWhKqWjmpprinYgKJLgKHnbb8=
cloud.google.com/go v0.81.0/go.mod h1:mk/AM35KwGk/Nm2YSeZbxXdrNK3KZOYHmLkOqC2V6E0=
cloud.google.com/go v0.82.0/go.mod h1:vlKccHJGuFBFufnAnuB08dfEH9Y3H7dzDzRECFdC2TA=
cloud.google.com/go v0.83.0 h1:bAMqZidYkmIsUqe6PtkEPT7Q+vfizScn+jfNA6jwK9c=
cloud.google.com/go v0.83.0/go.mod h1:Z7MJUsANfY0pYPdw0lbnivPx4/vhy/e2FEkSkF7vAVY=
cloud.google.com/go v0.84.0/go.mod h1:RazrYuxIK6Kb7YrzzhPoLmCVzl7Sup4NrbKPg8KHSUM=
cloud.google.com/go v0.87.0/go.mod h1:TpDYlFy7vuLzZMMZ+B6iRiELaY7z/gJPaqbMx6mlWcY=
cloud.google.com/go v0.90.0/go.mod h1:kRX0mNRHe0e2rC6oNakvwQqzyDmg57xJ+SZU1eT2aDQ=
cloud.google.com/go v0.93.3/go.mod h1:8utlLll2EF5XMAV15woO4lSbWQlk8rer9aLOfLh7+YI=
cloud.google.com/go v0.94.1/go.mod h1:qAlAugsXlC+JWO+Bke5vCtc9ONxjQT3drlTTnAplMW4=
cloud.google.com/go v0.97.0 h1:3DXvAyifywvq64LfkKaMOmkWPS1CikIQdMe2lY9vxU8=
cloud.google.com/go v0.97.0/go.mod h1:GF7l59pYBVlXQIBLx3a761cZ41F9bBH3JUlihCt2Udc=
cloud.google.com/go/bigquery v1.0.1/go.mod h1:i/xbL2UlR5RvWAURpBYZTtm/cXjCha9lbfbpx4poX+o=
cloud.google.com/go/bigquery v1.3.0/go.mod h1:PjpwJnslEMmckchkHFfq+HTD2DmtT67aNFKH1/VBDHE=
cloud.google.com/go/bigquery v1.4.0/go.mod h1:S8dzgnTigyfTmLBfrtrhyYhwRxG72rYxvftPBK2Dvzc=
@ -33,16 +38,22 @@ cloud.google.com/go/bigquery v1.5.0/go.mod h1:snEHRnqQbz117VIFhE8bmtwIDY80NLUZUM
cloud.google.com/go/bigquery v1.7.0/go.mod h1://okPTzCYNXSlb24MZs83e2Do+h+VXtc4gLoIoXIAPc=
cloud.google.com/go/bigquery v1.8.0/go.mod h1:J5hqkt3O0uAFnINi6JXValWIb1v0goeZM77hZzJN/fQ=
cloud.google.com/go/bigtable v1.2.0/go.mod h1:JcVAOl45lrTmQfLj7T6TxyMzIN/3FGGcFm+2xVAli2o=
cloud.google.com/go/bigtable v1.10.1 h1:QKcRHeAsraxIlrdCZ3LLobXKBvITqcOEnSbHG2rzL9g=
cloud.google.com/go/bigtable v1.10.1/go.mod h1:cyHeKlx6dcZCO0oSQucYdauseD8kIENGuDOJPKMCVg8=
cloud.google.com/go/bigtable v1.12.0 h1:wrT+HK/lW1biwbCQY1DJjRMelNjJLB5YPxDN40/BLaY=
cloud.google.com/go/bigtable v1.12.0/go.mod h1:W96Adxrf90LlA4fuB+UjFu/Y8OpoaK7y2kupKid5PPU=
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/functions v1.0.0 h1:cOFEDJ3sgAFRjRULSUJ0Q8cw9qFa5JdpXIBWoNX5uDw=
cloud.google.com/go/functions v1.0.0/go.mod h1:O9KS8UweFVo6GbbbCBKh5yEzbW08PVkg2spe3RfPMd4=
cloud.google.com/go/kms v1.0.0 h1:YkIeqPXqTAlwXk3Z2/WG0d6h1tqJQjU354WftjEoP9E=
cloud.google.com/go/kms v1.0.0/go.mod h1:nhUehi+w7zht2XrUfvTRNpxrfayBHqP4lu2NSywui/0=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
cloud.google.com/go/pubsub v1.3.1 h1:ukjixP1wl0LpnZ6LWtZJ0mX5tBmjp1f8Sqer8Z2OMUU=
cloud.google.com/go/pubsub v1.3.1/go.mod h1:i+ucay31+CNRpDW4Lu78I4xXG+O1r/MAHgjpRVR+TSU=
cloud.google.com/go/pubsub v1.17.1 h1:s2UGTTphpnUQ0Wppkp2OprR4pS3nlBpPvyL2GV9cqdc=
cloud.google.com/go/pubsub v1.17.1/go.mod h1:4qDxMr1WsM9+aQAz36ltDwCIM+R0QdlseyFjBuNvnss=
cloud.google.com/go/storage v1.0.0/go.mod h1:IhtSnM/ZTZV8YYJWCY8RULGVqBDmpoyjwiyrjsg+URw=
cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0ZeosJ0Rtdos=
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
@ -89,8 +100,8 @@ github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
github.com/GoogleCloudPlatform/functions-framework-go v1.3.0 h1:mRl3Slv6JanYgytd+j1fXsV8kS8ZifW8Lkqy4VI0pPM=
github.com/GoogleCloudPlatform/functions-framework-go v1.3.0/go.mod h1:EZSBkJqP6+lFbW+M8ZET/r+uZRl3ENAEdoTNtk6NzGA=
github.com/GoogleCloudPlatform/functions-framework-go v1.5.2 h1:fPYZMZ8BSK2jfZ28VG6vYxr/PTLbG+9USn8njzxfmWM=
github.com/GoogleCloudPlatform/functions-framework-go v1.5.2/go.mod h1:pq+lZy4vONJ5fjd3q/B6QzWhfHPAbuVweLpxZzMOb9Y=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@ -201,8 +212,8 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/v2 v2.2.0 h1:FlBJg7W0QywbOjuZGmRXUyFk8qkCHx2euETp+tuopSU=
github.com/cloudevents/sdk-go/v2 v2.2.0/go.mod h1:3CTrpB4+u7Iaj6fd7E2Xvm5IxMdRoaAhqaRVnOr2rCU=
github.com/cloudevents/sdk-go/v2 v2.6.1 h1:yHtzgmeBvc0TZx1nrnvYXov1CSvkQyvhEhNMs8Z5Mmk=
github.com/cloudevents/sdk-go/v2 v2.6.1/go.mod h1:nlXhgFkf0uTopxmRXalyMwS2LG70cRGPrxzmjJgSG0U=
github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U=
github.com/cloudflare/cloudflare-go v0.14.0/go.mod h1:EnwdgGMaFOruiPZRFSgn+TsQ3hQ7C/YWzIGLeu5c304=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
@ -328,7 +339,6 @@ github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiD
github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4=
github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4=
github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ=
github.com/gagliardetto/binary v0.5.0 h1:fLVIbMEQp5zbW0SFtg0z/WHy2iUw/SMeuakzfa5ayhI=
github.com/gagliardetto/binary v0.5.0/go.mod h1:peJR9PvwamL4YOh1nHWCPLry2VEfeeD1ADvewka7HnQ=
@ -464,6 +474,8 @@ github.com/google/pprof v0.0.0-20210122040257-d980be63207e/go.mod h1:kpwsk12EmLe
github.com/google/pprof v0.0.0-20210226084205-cbba55b83ad5/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210506205249-923b5ab0fc1a/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210601050228-01bbb1931b22/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210609004039-a478d1d731e9/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
@ -475,8 +487,10 @@ github.com/googleapis/gax-go v2.0.0+incompatible h1:j0GKcs05QVmm7yesiZq2+9cxHkNK
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
github.com/googleapis/gax-go/v2 v2.0.3/go.mod h1:LLvjysVCY1JZeum8Z6l8qUty8fiNwE08qbEPm1M08qg=
github.com/googleapis/gax-go/v2 v2.0.4/go.mod h1:0Wqv26UfaUD9n4G6kQubkQ+KchISgw+vpHVxEJEs9eg=
github.com/googleapis/gax-go/v2 v2.0.5 h1:sjZBwGj9Jlw33ImPtvFviGYvseOtDM7hkSKB7+Tv3SM=
github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5mhpdKc/us6bOk=
github.com/googleapis/gax-go/v2 v2.1.0/go.mod h1:Q3nei7sK6ybPYH7twZdmQpAd1MKb7pfu6SK+H1/DsU0=
github.com/googleapis/gax-go/v2 v2.1.1 h1:dp3bWCh+PPO1zjRRiCSczJav13sBvG4UhNyVTa1KqdU=
github.com/googleapis/gax-go/v2 v2.1.1/go.mod h1:hddJymUZASv3XPyGkUpKj8pPO47Rmb0eJc8R6ouapiM=
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gordonklaus/ineffassign v0.0.0-20200309095847-7953dde2c7bf/go.mod h1:cuNKsD1zp2v6XfE/orVX2QE1LC+i254ceGcVeDT3pTU=
github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg=
@ -640,7 +654,6 @@ github.com/json-iterator/go v1.1.10/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/
github.com/json-iterator/go v1.1.11 h1:uVUAXhF2To8cbw/3xN3pxj6kk7TYKs98NIrTqPlMWAQ=
github.com/json-iterator/go v1.1.11/go.mod h1:KdQUCv79m/52Kvf8AW2vK1V8akMuk1QjK/uOdHXbAo4=
github.com/jstemmer/go-junit-report v0.0.0-20190106144839-af01ea7f8024/go.mod h1:6v2b51hI/fHJwM22ozAgKL4VKDeJcHhJFhtBdhmNjmU=
github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
github.com/jstemmer/go-junit-report v0.9.1/go.mod h1:Brl9GWCQeLvo8nXZwPNNblvFj/XSXhF0NWZEnDohbsk=
github.com/jsternberg/zap-logfmt v1.0.0/go.mod h1:uvPs/4X51zdkcm5jXl5SYoN+4RK21K8mysFmDaM/h+o=
github.com/jtolds/gls v4.20.0+incompatible/go.mod h1:QJZ7F/aHp+rZTRtaJ1ow/lLfFfVYBRgL+9YlvaHOwJU=
@ -866,8 +879,6 @@ github.com/libp2p/go-yamux/v2 v2.0.0/go.mod h1:NVWira5+sVUIU6tu1JWvaRn1dRnG+cawO
github.com/libp2p/go-yamux/v2 v2.2.0/go.mod h1:3So6P6TV6r75R9jiBpiIKgU/66lOarCZjqROGxzPpPQ=
github.com/lightstep/lightstep-tracer-common/golang/gogo v0.0.0-20190605223551-bc2310a04743/go.mod h1:qklhhLq1aX+mtWk9cPHPzaBjWImj5ULL6C7HFJtXQMM=
github.com/lightstep/lightstep-tracer-go v0.18.1/go.mod h1:jlF1pusYV4pidLvZ+XD0UBX0ZE6WURAspgAczcDHrL4=
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac h1:+2b6iGRJe3hvV/yVXrd41yVEjxuFHxasJqDhkIjS4gk=
github.com/lightstep/tracecontext.go v0.0.0-20181129014701-1757c391b1ac/go.mod h1:Frd2bnT3w5FB5q49ENTfVlztJES+1k/7lyWX2+9gq/M=
github.com/logrusorgru/aurora v2.0.3+incompatible h1:tOpm7WcpBTn4fjmVfgpQq0EfczGlG91VSDkswnjF5A8=
github.com/logrusorgru/aurora v2.0.3+incompatible/go.mod h1:7rIyQOR62GCctdiQpZ/zOJlFyk6y+94wXzv6RNZgaR4=
github.com/lucas-clemente/quic-go v0.19.3/go.mod h1:ADXpNbTQjq1hIzCpB+y/k5iz4n4z4IwqoLb94Kh5Hu8=
@ -901,9 +912,8 @@ github.com/mattn/go-isatty v0.0.4/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNx
github.com/mattn/go-isatty v0.0.5-0.20180830101745-3fb116b82035/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.5/go.mod h1:Iq45c/XA43vh69/j3iqttzPXn0bhXyGjM0Hdxcsrc5s=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.13 h1:qdl+GuBjcsKKDco5BsxPJlId98mSWNKqYA+Co0SC1yA=
github.com/mattn/go-isatty v0.0.13/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14 h1:yVuAays6BHfxijgZPzw+3Zlu5yQgKGP2/hcQbHb7S9Y=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-runewidth v0.0.2/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.3/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
github.com/mattn/go-runewidth v0.0.4/go.mod h1:LwmH8dsx7+W8Uxz3IHJYH5QSwggIsqBzpuz5H//U1FU=
@ -1025,7 +1035,6 @@ github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLA
github.com/nishanths/predeclared v0.0.0-20200524104333-86fad755b4d3/go.mod h1:nt3d53pc1VYcphSCIaYAJtnPYnr3Zyn8fMq2wvPGPso=
github.com/nkovacs/streamquote v0.0.0-20170412213628-49af9bddb229/go.mod h1:0aYXnNPJ8l7uZxf45rWW1a/uME32OF0rhiYGNQ2oF2E=
github.com/nxadm/tail v1.4.4/go.mod h1:kenIhsEOeOJmVchQTgglprH7qJGnHDVpk1VPCcaMI8A=
github.com/nxadm/tail v1.4.8 h1:nPr65rt6Y5JFSKQO7qToXr7pePgD6Gwiw05lkbyAQTE=
github.com/nxadm/tail v1.4.8/go.mod h1:+ncqLTQzXmGhMZNUePPaPqPvBxHAIsmXswZKocGu+AU=
github.com/oklog/oklog v0.3.2/go.mod h1:FCV+B7mhrz4o+ueLpx+KqkyXRGMWOYEvfiXtdGtbWGs=
github.com/oklog/run v1.0.0/go.mod h1:dlhp/R75TPv97u0XWUtDeV/lRKWPKSdTuV0TZvrmrQA=
@ -1037,23 +1046,17 @@ github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.7.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.2/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.12.0/go.mod h1:oUhWkIvk5aDxtKvDDuw8gItl8pKl42LzjC9KZE0HfGg=
github.com/onsi/ginkgo v1.12.1/go.mod h1:zj2OWP4+oCPe1qIXoGWkgMRwljMUYCdkwsT2108oapk=
github.com/onsi/ginkgo v1.14.0/go.mod h1:iSB4RoI2tjJc9BBv4NKIKWKya62Rps+oPG/Lv9klQyY=
github.com/onsi/ginkgo v1.16.2/go.mod h1:CObGmKUOKaSC0RjmoAK7tKyn4Azo5P2IWuoMnvwxz1E=
github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc=
github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0=
github.com/onsi/gomega v1.4.1/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA=
github.com/onsi/gomega v1.4.3/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.1/go.mod h1:XdKZgCCFLUoM/7CFJVPcG8C1xQ1AJ0vpAezJrB7JYyY=
github.com/onsi/gomega v1.8.1/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/onsi/gomega v1.9.0/go.mod h1:Ho0h+IUsWyvy1OpqCwxlQ/21gkhVunqlU8fDGcoTdcA=
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.13.0 h1:7lLHu94wT9Ij0o6EWWclhu0aOh32VxhkwEJvzuWPeak=
github.com/onsi/gomega v1.13.0/go.mod h1:lRk9szgn8TxENtWd0Tp4c3wjlRfMTMH27I+3Je41yGY=
github.com/op/go-logging v0.0.0-20160315200505-970db520ece7/go.mod h1:HzydrMdWErDVzsI23lYNej1Htcns9BCg93Dk0bBINWk=
github.com/opentracing-contrib/go-observer v0.0.0-20170622124052-a52f23424492/go.mod h1:Ngi6UdF0k5OKD5t5wlmGhe/EDKPoUM3BXZSSfIuJbis=
@ -1305,7 +1308,6 @@ github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLY
github.com/urfave/cli v1.20.0/go.mod h1:70zkFmudgCuE/ngEzBv17Jvp/497gISqfk5gWijbERA=
github.com/urfave/cli v1.22.1/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0=
github.com/urfave/cli/v2 v2.3.0/go.mod h1:LJmUH05zAU44vOAcrfzZQKsZbVcdbOG8rtL3/XcUArI=
github.com/valyala/bytebufferpool v1.0.0 h1:GqA5TC/0021Y/b9FG4Oi9Mr3q7XYx6KllzawFIhcdPw=
github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyCJ6HpOuEn7z0Csc=
github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8=
github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU=
@ -1447,7 +1449,6 @@ golang.org/x/lint v0.0.0-20191125180803-fdd1cda4f05f/go.mod h1:5qLYkcX4OjUUV8bRu
golang.org/x/lint v0.0.0-20200130185559-910be7a94367/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20200302205851-738671d3881b/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20201208152925-83fdc39ff7b5/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 h1:VLliZ0d+/avPrXXH+OakdXhpJuEoBZuwh1m2j7U6Iug=
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616/go.mod h1:3xt1FjdF8hUf6vQPIChWIBhFzV8gjjsPE/fR3IyQdNY=
golang.org/x/mobile v0.0.0-20190312151609-d3739f865fa6/go.mod h1:z+o9i4GpDbdi3rU15maQ/Ox0txvL9dWGYEHz965HBQE=
golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o=
@ -1461,7 +1462,6 @@ golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.1/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.4.2 h1:Gz96sIWK3OalVv/I/qNygP42zyoKp3xptRVCWRFEBvo=
golang.org/x/mod v0.4.2/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180719180050-a680a1efc54d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -1523,9 +1523,8 @@ golang.org/x/net v0.0.0-20210423184538-5f58ad60dda6/go.mod h1:OJAsFXCWl8Ukc7SiCT
golang.org/x/net v0.0.0-20210428140749-89ef3d95e781/go.mod h1:OJAsFXCWl8Ukc7SiCT/9KSuxbyM7479/AVlXFRxuMCk=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210510120150-4163338589ed/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5 h1:wjuX4b5yYQnEQHzd+CBcrcC6OVR2J1CN6mUy0oSxIPo=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f h1:w6wWR0H+nyVpbSAQbzVEIACVyr/h8l/BEkY6Sokc7Eg=
golang.org/x/net v0.0.0-20210903162142-ad29c8ab022f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181017192945-9dcd33a902f4/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20181203162652-d668ce993890/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
@ -1542,8 +1541,12 @@ golang.org/x/oauth2 v0.0.0-20210313182246-cd4f82c27b84/go.mod h1:KelEdhl1UZF7XfJ
golang.org/x/oauth2 v0.0.0-20210402161424-2e8d93401602/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210427180440-81ed05c6b58c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210514164344-f6687ab2804c/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210615190721-d04028783cf1 h1:x622Z2o4hgCr/4CiKWc51jHVKaWdtVpBNmEI8wI9Qns=
golang.org/x/oauth2 v0.0.0-20210615190721-d04028783cf1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210628180205-a41e5a781914/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210805134026-6f1e6394065a/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1 h1:B333XXssMuKQeBwiNODx4TupZy7bf4sxFZnN2ZOcvUE=
golang.org/x/oauth2 v0.0.0-20211005180243-6b3c2da341f1/go.mod h1:KelEdhl1UZF7XfJ4dDtk6s++YSgaE7mD/BuKKDLBl4A=
golang.org/x/perf v0.0.0-20180704124530-6e6d33e29852/go.mod h1:JLpeXjPJfIyPr5TlbXLkXWLhP8nz10XfvxElABhCtcw=
golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20181108010431-42b317875d0f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
@ -1652,9 +1655,15 @@ golang.org/x/sys v0.0.0-20210514084401-e8d321eab015/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20210601080250-7ecdf8ef093b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603081109-ebe580a85c40/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210603125802-9665404d3644/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210903071746-97244b99971b h1:3Dq0eVHn0uaQJmPO+/aYPI/fRMqdrVDbu7MQcku54gg=
golang.org/x/sys v0.0.0-20210903071746-97244b99971b/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210806184541-e5e7981a1069/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210823070655-63515b42dcdf/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210908233432-aa78b53d3365/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210917161153-d61c044b1678/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211007075335-d3039528d8ac/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359 h1:2B5p2L5IfGiD7+b9BOoRMC6DgObAVZV+Fsp050NqXik=
golang.org/x/sys v0.0.0-20211025201205-69cdffdb9359/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201117132131-f5c789dd3221/go.mod h1:Nr5EML6q2oocZ2LXRh80K7BxOlk5/8JxuGnuhpl+muw=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20201210144234-2321bbc49cbf h1:MZ2shdL+ZM/XzY3ZGOnh4Nlpnxz5GSOhOmtHo3iPU6M=
@ -1674,6 +1683,7 @@ golang.org/x/time v0.0.0-20190308202827-9d24e82272b4/go.mod h1:tRJNPiyCQ0inRvYxb
golang.org/x/time v0.0.0-20191024005414-555d28b269f0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20200630173020-3af7569d3a1e/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20201208040808-7e3f01d25324/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ=
golang.org/x/tools v0.0.0-20180525024113-a5b4c53f6e8b/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
@ -1732,7 +1742,6 @@ golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roY
golang.org/x/tools v0.0.0-20200717024301-6ddee64345a6/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200729194436-6467de6f59a7/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200804011535-6c149bb5ef0d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200806022845-90696ccdc692/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200825202427-b303f430e36d/go.mod h1:njjCfa9FT2d7l9Bc6FUM5FLjQPp3cFF28FI3qnDFljA=
golang.org/x/tools v0.0.0-20200904185747-39188db58858/go.mod h1:Cj7w3i3Rnn0Xh82ur9kSqwfTHTeVxaDqrfMjpcNT6bE=
golang.org/x/tools v0.0.0-20201110124207-079ba7bd75cd/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
@ -1744,8 +1753,9 @@ golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4f
golang.org/x/tools v0.1.0/go.mod h1:xkSsbof2nBLbhDlRMhhhyNLN/zl3eTqcnHD5viDpcZ0=
golang.org/x/tools v0.1.1/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.2/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.3 h1:L69ShwSZEyCsLKoAxDKeMvLDZkumEe8gXUZAjab0tX8=
golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
@ -1786,8 +1796,16 @@ google.golang.org/api v0.43.0/go.mod h1:nQsDGjRXMo4lvh5hP0TKqF244gqhGcr/YSIykhUk
google.golang.org/api v0.44.0/go.mod h1:EBOGZqzyhtvMDoxwS97ctnh0zUmYY6CxqXsc1AvkYD8=
google.golang.org/api v0.46.0/go.mod h1:ceL4oozhkAiTID8XMmJBsIxID/9wMXJVVFXPg4ylg3I=
google.golang.org/api v0.47.0/go.mod h1:Wbvgpq1HddcWVtzsVLyfLp8lDg6AA241LmgIL59tHXo=
google.golang.org/api v0.48.0 h1:RDAPWfNFY06dffEXfn7hZF5Fr1ZbnChzfQZAPyBd1+I=
google.golang.org/api v0.48.0/go.mod h1:71Pr1vy+TAZRPkPs/xlCf5SsU8WjuAWv1Pfjbtukyy4=
google.golang.org/api v0.50.0/go.mod h1:4bNT5pAuq5ji4SRZm+5QIkjny9JAyVD/3gaSihNefaw=
google.golang.org/api v0.51.0/go.mod h1:t4HdrdoNgyN5cbEfm7Lum0lcLDLiise1F8qDKX00sOU=
google.golang.org/api v0.54.0/go.mod h1:7C4bFFOvVDGXjfDTAsgGwDgAxRDeQ4X8NvUedIt6z3k=
google.golang.org/api v0.55.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.56.0/go.mod h1:38yMfeP1kfjsl8isn0tliTjIb1rJXcQi4UXlbqivdVE=
google.golang.org/api v0.57.0/go.mod h1:dVPlbZyBo2/OjBpmvNdpn2GRm6rPy75jyU7bmhdrMgI=
google.golang.org/api v0.58.0/go.mod h1:cAbP2FsxoGVNwtgNAmmn3y5G1TWAiVYRmg4yku3lv+E=
google.golang.org/api v0.59.0 h1:fPfFO7gttlXYo2ALuD3HxJzh8vaF++4youI0BkFL6GE=
google.golang.org/api v0.59.0/go.mod h1:sT2boj7M9YJxZzgeZqXogmhfmRWDtPzT31xkieUbuZU=
google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM=
google.golang.org/appengine v1.2.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
google.golang.org/appengine v1.3.0/go.mod h1:xpcJRLb0r/rnEns0DIKYYv+WjYCduHsrkT7/EB5XEv4=
@ -1837,7 +1855,6 @@ google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEY
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200804131852-c06518451d9c/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200806141610-86f49bd18e98/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200825200019-8632dd797987/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20200904004341-0bd0a958aa1d/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
google.golang.org/genproto v0.0.0-20201109203340-2640f1f9cdfb/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
@ -1857,8 +1874,26 @@ google.golang.org/genproto v0.0.0-20210517163617-5e0236093d7a/go.mod h1:P3QM42oQ
google.golang.org/genproto v0.0.0-20210601144548-a796c710e9b6/go.mod h1:P3QM42oQyzQSnHPnZ/vqoCdDmzH28fzWByN9asMeM8A=
google.golang.org/genproto v0.0.0-20210602131652-f16073e35f0c/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20210604141403-392c879c8b08/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced h1:c5geK1iMU3cDKtFrCVQIcjR3W+JOZMuhIyICMCTbtus=
google.golang.org/genproto v0.0.0-20210608205507-b6d2f5bf0d7d/go.mod h1:UODoCrxHCcBojKKwX1terBiRUaqAsFqJiF615XL43r0=
google.golang.org/genproto v0.0.0-20210617175327-b9e0b3197ced/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24=
google.golang.org/genproto v0.0.0-20210624195500-8bfb893ecb84/go.mod h1:SzzZ/N+nwJDaO1kznhnlzqS8ocJICar6hYhVyhi++24=
google.golang.org/genproto v0.0.0-20210713002101-d411969a0d9a/go.mod h1:AxrInvYm1dci+enl5hChSFPOmmUF1+uAa/UsgNRWd7k=
google.golang.org/genproto v0.0.0-20210716133855-ce7ef5c701ea/go.mod h1:AxrInvYm1dci+enl5hChSFPOmmUF1+uAa/UsgNRWd7k=
google.golang.org/genproto v0.0.0-20210728212813-7823e685a01f/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
google.golang.org/genproto v0.0.0-20210805201207-89edb61ffb67/go.mod h1:ob2IJxKrgPT52GcgX759i1sleT07tiKowYBGbczaW48=
google.golang.org/genproto v0.0.0-20210813162853-db860fec028c/go.mod h1:cFeNkxwySK631ADgubI+/XFU/xp8FD5KIVV4rj8UC5w=
google.golang.org/genproto v0.0.0-20210821163610-241b8fcbd6c8/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210828152312-66f60bf46e71/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210831024726-fe130286e0e2/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210903162649-d08c68adba83/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210909211513-a8c4777a87af/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEctNawg4eG61bRv87N7iHBWyVhJu7u1kqDUXY=
google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211019152133-63b7e35f4404/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211027162914-98a5263abeca h1:+e+aQDO4/c9KaG8PXWHTc6/+Du6kz+BKcXCSnV4SSTE=
google.golang.org/genproto v0.0.0-20211027162914-98a5263abeca/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/grpc v1.14.0/go.mod h1:yo6s7OP7yaDglbqo1J04qKzAhqBH6lvTonzMVmEdcZw=
google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio=
google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs=
@ -1891,6 +1926,8 @@ google.golang.org/grpc v1.36.1/go.mod h1:qjiiYl8FncCW8feJPdyg3v6XW24KsRHe+dy9BAG
google.golang.org/grpc v1.37.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.37.1/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.38.0/go.mod h1:NREThFqKR1f3iQ6oBuvc5LadQuXVGo9rkm5ZGrQdJfM=
google.golang.org/grpc v1.39.0/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.39.1/go.mod h1:PImNr+rS9TWYb2O4/emRugxiyHZ5JyHW5F+RPnDzfrE=
google.golang.org/grpc v1.40.0 h1:AGJ0Ih4mHjSeibYkFGh1dD9KJ/eOtZ93I6hoHhukQ5Q=
google.golang.org/grpc v1.40.0/go.mod h1:ogyxbiOoUXAkP+4+xa6PZSE9DZgIHtSpzjDTB9KAK34=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
@ -1928,7 +1965,6 @@ gopkg.in/olebedev/go-duktape.v3 v3.0.0-20200619000410-60c24ae608a6/go.mod h1:uAJ
gopkg.in/resty.v1 v1.12.0/go.mod h1:mDo4pnntr5jdWRML875a/NmxYqAlA73dVijT2AXvQQo=
gopkg.in/src-d/go-cli.v0 v0.0.0-20181105080154-d492247bbc0d/go.mod h1:z+K8VcOYVYcSwSjGebuDL6176A1XskgbtNl64NSg+n8=
gopkg.in/src-d/go-log.v1 v1.0.1/go.mod h1:GN34hKP0g305ysm2/hctJ0Y8nWP3zxXXJ8GFabTyABE=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/urfave/cli.v1 v1.20.0/go.mod h1:vuBzUtMdQeixQj8LVd+/98pzhxNGQoyuPBlsXHOQNO0=
gopkg.in/warnings.v0 v0.1.2/go.mod h1:jksf8JmL6Qr/oQM2OXTHunEvvTAsrWBLb6OOjuVWRNI=

View File

@ -0,0 +1,380 @@
// Package p contains an HTTP Cloud Function.
package p
import (
// "bytes"
"context"
// "encoding/binary"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"sort"
"strconv"
"sync"
"time"
"cloud.google.com/go/bigtable"
)
type cumulativeResult struct {
AllTime map[string]map[string]float64
AllTimeDurationDays int
Daily map[string]map[string]map[string]float64
}
// an in-memory cache of previously calculated results
var warmCumulativeCache = map[string]map[string]map[string]map[string]float64{}
var muWarmCumulativeCache sync.RWMutex
var warmCumulativeCacheFilePath = "/notional-transferred-to-cumulative-cache.json"
var transferredToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
var muTransferredToUpToYesterday sync.RWMutex
var transferredToUpToYesterdayFilePath = "/notional-transferred-to-up-to-yesterday-cache.json"
// calculates the amount of each symbol transfered to each chain.
func transferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
now := time.Now().UTC()
today := now.Format("2006-01-02")
oneDayAgo := -time.Duration(24) * time.Hour
yesterday := now.Add(oneDayAgo).Format("2006-01-02")
result := map[string]map[string]float64{"*": {"*": 0}}
// create the unique identifier for this query, for cache
cachePrefix := createCachePrefix(prefix)
muTransferredToUpToYesterday.Lock()
if _, ok := transferredToUpToYesterday[cachePrefix]; !ok {
transferredToUpToYesterday[cachePrefix] = map[string]map[string]map[string]float64{}
}
if cacheData, ok := transferredToUpToYesterday[cachePrefix][yesterday]; ok {
// cache has data through midnight yesterday
result = cacheData
// set the start to be the start of today
start = time.Date(now.Year(), now.Month(), now.Day(), 0, 0, 0, 0, now.Location())
}
muTransferredToUpToYesterday.Unlock()
dailyTotals := amountsTransferredToInInterval(tbl, ctx, prefix, start)
// loop through the query results to combine cache + fresh data
for _, chains := range dailyTotals {
for chain, tokens := range chains {
// ensure the chain exists in the result map
if _, ok := result[chain]; !ok {
result[chain] = map[string]float64{"*": 0}
}
for symbol, amount := range tokens {
if _, ok := result[chain][symbol]; !ok {
result[chain][symbol] = 0
}
// add the amount of this symbol transferred this day to the
// amount already in the result (amount of this symbol prevoiusly transferred)
result[chain][symbol] = result[chain][symbol] + amount
}
}
}
muTransferredToUpToYesterday.Lock()
if _, ok := transferredToUpToYesterday[cachePrefix][yesterday]; !ok {
// no cache, populate it
upToYesterday := result
for chain, tokens := range dailyTotals[today] {
for symbol, amount := range tokens {
upToYesterday[chain][symbol] = upToYesterday[chain][symbol] - amount
}
}
transferredToUpToYesterday[cachePrefix][yesterday] = upToYesterday
muTransferredToUpToYesterday.Unlock()
// write the updated cache to disc
persistInterfaceToJson(transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, transferredToUpToYesterday)
} else {
muTransferredToUpToYesterday.Unlock()
}
return result
}
// returns a slice of dates (strings) for each day in the period. Dates formatted: "2021-12-30".
func getDaysInRange(start, end time.Time) []string {
now := time.Now().UTC()
numDays := int(end.Sub(start).Hours() / 24)
days := []string{}
for daysAgo := 0; daysAgo <= numDays; daysAgo++ {
hoursAgo := (24 * daysAgo)
daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
n := now.Add(daysAgoDuration)
year := n.Year()
month := n.Month()
day := n.Day()
loc := n.Location()
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
dateStr := start.Format("2006-01-02")
days = append(days, dateStr)
}
return days
}
// calcuates a running total of notional value transferred, by symbol, since the start time specified.
func createCumulativeAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
now := time.Now().UTC()
today := now.Format("2006-01-02")
cachePrefix := createCachePrefix(prefix)
cacheNeedsUpdate := false
muWarmCumulativeCache.Lock()
if _, ok := warmCumulativeCache[cachePrefix]; !ok {
warmCumulativeCache[cachePrefix] = map[string]map[string]map[string]float64{}
}
muWarmCumulativeCache.Unlock()
results := map[string]map[string]map[string]float64{}
// fetch the amounts of transfers by symbol, for each day since launch (releaseDay)
dailyAmounts := amountsTransferredToInInterval(tbl, ctx, prefix, releaseDay)
// create a slice of dates, order oldest first
dateKeys := make([]string, 0, len(dailyAmounts))
for k := range dailyAmounts {
dateKeys = append(dateKeys, k)
}
sort.Strings(dateKeys)
// iterate through the dates in the result set, and accumulate the amounts
// of each token transfer by symbol, based on the destination of the transfer.
for i, date := range dateKeys {
muWarmCumulativeCache.RLock()
if dateCache, ok := warmCumulativeCache[cachePrefix][date]; ok && dateCache != nil {
// have a cached value for this day, use it.
results[date] = dateCache
muWarmCumulativeCache.RUnlock()
} else {
// no cached value for this day, must calculate it
muWarmCumulativeCache.RUnlock()
if i == 0 {
// special case for first day, no need to sum.
results[date] = dailyAmounts[date]
} else {
results[date] = map[string]map[string]float64{"*": {"*": 0}}
// find the string of the previous day
prevDate := dateKeys[i-1]
prevDayAmounts := results[prevDate]
thisDayAmounts := dailyAmounts[date]
// iterate through all the transfers and add the previous day's amount, if it exists
for chain, thisDaySymbols := range thisDayAmounts {
// create a union of the symbols from this day, and previous days
symbolsUnion := map[string]string{}
for symbol := range prevDayAmounts[chain] {
symbolsUnion[symbol] = symbol
}
for symbol := range thisDaySymbols {
symbolsUnion[symbol] = symbol
}
// initalize the chain/symbol map for this date
if _, ok := results[date][chain]; !ok {
results[date][chain] = map[string]float64{"*": 0}
}
// iterate through the union of symbols, creating an amount for each one,
// and adding it the the results.
for symbol := range symbolsUnion {
thisDayAmount := float64(0)
if amt, ok := thisDaySymbols[symbol]; ok {
thisDayAmount = amt
}
prevDayAmount := float64(0)
if amt, ok := results[prevDate][chain][symbol]; ok {
prevDayAmount = amt
}
cumulativeAmount := prevDayAmount + thisDayAmount
results[date][chain][symbol] = cumulativeAmount
}
}
}
// dont cache today
if date != today {
// set the result in the cache
muWarmCumulativeCache.Lock()
if _, ok := warmCumulativeCache[cachePrefix][date]; !ok {
// cache does not have this date, persist it for other instances.
warmCumulativeCache[cachePrefix][date] = results[date]
cacheNeedsUpdate = true
}
muWarmCumulativeCache.Unlock()
}
}
}
if cacheNeedsUpdate {
persistInterfaceToJson(warmCumulativeCacheFilePath, &muWarmCumulativeCache, warmCumulativeCache)
}
// take the most recent n days, rather than returning all days since launch
selectDays := map[string]map[string]map[string]float64{}
days := getDaysInRange(start, now)
for _, day := range days {
selectDays[day] = results[day]
}
return selectDays
}
// calculates the cumulative value transferred each day since launch.
func NotionalTransferredToCumulative(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numDays, forChain, forAddress, daily, allTime string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
numDays = queryParams.Get("numDays")
forChain = queryParams.Get("forChain")
forAddress = queryParams.Get("forAddress")
daily = queryParams.Get("daily")
allTime = queryParams.Get("allTime")
case http.MethodPost:
// declare request body properties
var d struct {
NumDays string `json:"numDays"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
Daily string `json:"daily"`
AllTime string `json:"allTime"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
numDays = d.NumDays
forChain = d.ForChain
forAddress = d.ForAddress
daily = d.Daily
allTime = d.AllTime
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
if daily == "" && allTime == "" {
// none of the options were set, so set one
allTime = "true"
}
var queryDays int
if numDays == "" {
queryDays = 30
} else {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
fmt.Fprint(w, "numDays must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
// create the rowkey prefix for querying
prefix := ""
if forChain != "" {
prefix = forChain
// if the request is forChain, always groupBy chain
if forAddress != "" {
// if the request is forAddress, always groupBy address
prefix = forChain + ":" + forAddress
}
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup
// total since launch
periodTransfers := map[string]map[string]float64{}
allTimeDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
if allTime != "" {
wg.Add(1)
go func(prefix string) {
defer wg.Done()
transfers := transferredToSince(tbl, context.Background(), prefix, releaseDay)
for chain, tokens := range transfers {
periodTransfers[chain] = map[string]float64{}
for symbol, amount := range tokens {
periodTransfers[chain][symbol] = roundToTwoDecimalPlaces(amount)
}
}
}(prefix)
}
// daily transfers by chain
dailyTransfers := map[string]map[string]map[string]float64{}
if daily != "" {
wg.Add(1)
go func(prefix string, queryDays int) {
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
transfers := createCumulativeAmountsOfInterval(tbl, ctx, prefix, start)
for date, chains := range transfers {
dailyTransfers[date] = map[string]map[string]float64{}
for chain, tokens := range chains {
dailyTransfers[date][chain] = map[string]float64{}
for symbol, amount := range tokens {
dailyTransfers[date][chain][symbol] = roundToTwoDecimalPlaces(amount)
}
}
}
}(prefix, queryDays)
}
wg.Wait()
result := &cumulativeResult{
AllTime: periodTransfers,
AllTimeDurationDays: allTimeDays,
Daily: dailyTransfers,
}
jsonBytes, err := json.Marshal(result)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Println(err.Error())
return
}
w.WriteHeader(http.StatusOK)
w.Write(jsonBytes)
}

View File

@ -24,11 +24,10 @@ type amountsResult struct {
Daily map[string]map[string]map[string]float64
}
// warmCache keeps some data around between invocations, so that we don't have
// to do a full table scan with each request.
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
// TODO - make a struct for cache
var warmAmountsCache = map[string]map[string]map[string]map[string]float64{}
// an in-memory cache of previously calculated results
var warmTransfersToCache = map[string]map[string]map[string]map[string]float64{}
var muWarmTransfersToCache sync.RWMutex
var warmTransfersToCacheFilePath = "/notional-transferred-to-cache.json"
type TransferData struct {
TokenSymbol string
@ -39,7 +38,8 @@ type TransferData struct {
Notional float64
}
func fetchAmountRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]TransferData, error) {
// finds all the TokenTransfer rows within the specified period
func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) []TransferData {
rows := []TransferData{}
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
@ -97,17 +97,17 @@ func fetchAmountRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix
),
))
if err != nil {
fmt.Println("failed reading rows to create RowList.", err)
return nil, err
log.Fatalln("failed reading rows to create RowList.", err)
}
return rows, err
return rows
}
func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int) (map[string]map[string]map[string]float64, error) {
var mu sync.RWMutex
// finds the daily amount of each symbol transferred to each chain, from the specified start to the present.
func amountsTransferredToInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
results := map[string]map[string]map[string]float64{}
now := time.Now().UTC()
numPrevDays := int(now.Sub(start).Hours() / 24)
var intervalsWG sync.WaitGroup
// there will be a query for each previous day, plus today
@ -116,6 +116,8 @@ func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix st
// create the unique identifier for this query, for cache
cachePrefix := createCachePrefix(prefix)
cacheNeedsUpdate := false
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
// start is the SOD, end is EOD
@ -136,46 +138,35 @@ func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix st
dateStr := start.Format("2006-01-02")
mu.Lock()
muWarmTransfersToCache.Lock()
// initialize the map for this date in the result set
results[dateStr] = map[string]map[string]float64{"*": {"*": 0}}
// check to see if there is cache data for this date/query
if dates, ok := warmAmountsCache[cachePrefix]; ok {
if dates, ok := warmTransfersToCache[cachePrefix]; ok {
// have a cache for this query
if dateCache, ok := dates[dateStr]; ok {
if dateCache, ok := dates[dateStr]; ok && len(dateCache) > 1 {
// have a cache for this date
if daysAgo >= 1 {
// only use the cache for yesterday and older
results[dateStr] = dateCache
mu.Unlock()
muWarmTransfersToCache.Unlock()
intervalsWG.Done()
return
}
} else {
// no cache for this query
warmAmountsCache[cachePrefix][dateStr] = map[string]map[string]float64{}
}
} else {
// no cache for this date, initialize the map
warmAmountsCache[cachePrefix] = map[string]map[string]map[string]float64{}
warmAmountsCache[cachePrefix][dateStr] = map[string]map[string]float64{}
// no cache for this query, initialize the map
warmTransfersToCache[cachePrefix] = map[string]map[string]map[string]float64{}
}
mu.Unlock()
var result []TransferData
var fetchErr error
muWarmTransfersToCache.Unlock()
defer intervalsWG.Done()
result, fetchErr = fetchAmountRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Fatalf("fetchAmountRowsInInterval returned an error: %v\n", fetchErr)
}
queryResult := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
// iterate through the rows and increment the count
for _, row := range result {
for _, row := range queryResult {
if _, ok := results[dateStr][row.DestinationChain]; !ok {
results[dateStr][row.DestinationChain] = map[string]float64{"*": 0}
}
@ -187,67 +178,88 @@ func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix st
results[dateStr]["*"][row.TokenSymbol] = results[dateStr]["*"][row.TokenSymbol] + row.Notional
// add to the count for chain/symbol
results[dateStr][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.DestinationChain][row.TokenSymbol] + row.Notional
}
// set the result in the cache
warmAmountsCache[cachePrefix][dateStr] = results[dateStr]
if daysAgo >= 1 {
// set the result in the cache
muWarmTransfersToCache.Lock()
if cacheData, ok := warmTransfersToCache[cachePrefix][dateStr]; !ok || len(cacheData) <= 1 {
// cache does not have this date, persist it for other instances.
warmTransfersToCache[cachePrefix][dateStr] = results[dateStr]
cacheNeedsUpdate = true
}
muWarmTransfersToCache.Unlock()
}
}(tbl, ctx, prefix, daysAgo)
}
intervalsWG.Wait()
// create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys
seenSymbolSet := map[string]bool{}
seenChainSet := map[string]bool{}
for date, tokens := range results {
for leaving := range tokens {
seenChainSet[leaving] = true
for key := range results[date][leaving] {
seenSymbolSet[key] = true
}
}
if cacheNeedsUpdate {
persistInterfaceToJson(warmTransfersToCacheFilePath, &muWarmTransfersToCache, warmTransfersToCache)
}
// ensure each chain object has all the same symbol keys:
for date := range results {
for leaving := range results[date] {
// loop through seen chains
for chain := range seenChainSet {
// check that date has all the chains
if _, ok := results[date][chain]; !ok {
results[date][chain] = map[string]float64{"*": 0}
}
}
// loop through seen symbols
for token := range seenSymbolSet {
// check that the chain has all the symbols
if _, ok := results[date][leaving][token]; !ok {
// add the missing key to the map
results[date][leaving][token] = 0
}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same chain keys
seenChainSet := map[string]bool{}
for _, chains := range results {
for leaving := range chains {
if _, ok := seenChainSet[leaving]; !ok {
seenChainSet[leaving] = true
}
}
}
return results, nil
var muResult sync.RWMutex
// ensure each chain object has all the same symbol keys:
for date, chains := range results {
// loop through seen chains
for chain := range seenChainSet {
// check that date has all the chains
if _, ok := chains[chain]; !ok {
muResult.Lock()
results[date][chain] = map[string]float64{"*": 0}
muResult.Unlock()
}
}
}
return results
}
func transferredToSinceDate(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
result := map[string]map[string]float64{"*": {"*": 0}}
dailyTotals := amountsTransferredToInInterval(tbl, ctx, prefix, start)
// loop through the query results to combine cache + fresh data
for _, chains := range dailyTotals {
for chain, tokens := range chains {
// ensure the chain exists in the result map
if _, ok := result[chain]; !ok {
result[chain] = map[string]float64{"*": 0}
}
for symbol, amount := range tokens {
if _, ok := result[chain][symbol]; !ok {
result[chain][symbol] = 0
}
// add the amount of this symbol transferred this day to the
// amount already in the result (amount of this symbol prevoiusly transferred)
result[chain][symbol] = result[chain][symbol] + amount
}
}
}
return result
}
// returns the count of the rows in the query response
func amountsForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) (map[string]map[string]float64, error) {
func transfersToForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) map[string]map[string]float64 {
// query for all rows in time range, return result count
results, fetchErr := fetchAmountRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
return nil, fetchErr
}
var total = float64(0)
for _, item := range results {
total = total + item.Notional
}
queryResults := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
result := map[string]map[string]float64{"*": {"*": total}}
result := map[string]map[string]float64{"*": {"*": 0}}
// iterate through the rows and increment the count for each index
for _, row := range results {
for _, row := range queryResults {
if _, ok := result[row.DestinationChain]; !ok {
result[row.DestinationChain] = map[string]float64{"*": 0}
}
@ -257,13 +269,13 @@ func amountsForInterval(tbl *bigtable.Table, ctx context.Context, prefix string,
result["*"][row.TokenSymbol] = result["*"][row.TokenSymbol] + row.Notional
// add to symbol amount
result[row.DestinationChain][row.TokenSymbol] = result[row.DestinationChain][row.TokenSymbol] + row.Notional
// add to all chains/all symbols total
result["*"]["*"] = result["*"]["*"] + row.Notional
}
return result, nil
return result
}
// get number of recent transactions in the last 24 hours, and daily for a period
// optionally group by a EmitterChain or EmitterAddress
// optionally query for recent rows of a given EmitterChain or EmitterAddress
// finds the value that has been transferred to each chain, by symbol.
func NotionalTransferredTo(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
@ -361,33 +373,29 @@ func NotionalTransferredTo(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup
// total of last 24 hours
var last24HourCount map[string]map[string]float64
last24HourCount := map[string]map[string]float64{}
if last24Hours != "" {
wg.Add(1)
go func(prefix string) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourCount, err = amountsForInterval(tbl, ctx, prefix, start, now)
for chain, tokens := range last24HourCount {
transfers := transfersToForInterval(tbl, ctx, prefix, start, now)
for chain, tokens := range transfers {
last24HourCount[chain] = map[string]float64{}
for symbol, amount := range tokens {
last24HourCount[chain][symbol] = roundToTwoDecimalPlaces(amount)
}
}
if err != nil {
log.Printf("failed getting count for 24h interval, err: %v", err)
}
}(prefix)
}
// total of the last numDays
var periodCount map[string]map[string]float64
periodTransfers := map[string]map[string]float64{}
if forPeriod != "" {
wg.Add(1)
go func(prefix string) {
var err error
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
@ -396,36 +404,39 @@ func NotionalTransferredTo(w http.ResponseWriter, r *http.Request) {
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
periodCount, err = amountsForInterval(tbl, ctx, prefix, start, now)
for chain, tokens := range periodCount {
// periodCount, err = transferredToSince(tbl, ctx, prefix, start)
// periodCount, err = transfersToForInterval(tbl, ctx, prefix, start, now)
transfers := transferredToSinceDate(tbl, ctx, prefix, start)
for chain, tokens := range transfers {
periodTransfers[chain] = map[string]float64{}
for symbol, amount := range tokens {
periodCount[chain][symbol] = roundToTwoDecimalPlaces(amount)
periodTransfers[chain][symbol] = roundToTwoDecimalPlaces(amount)
}
}
if err != nil {
log.Printf("failed getting count for numDays interval, err: %v\n", err)
}
}(prefix)
}
// daily totals
var dailyTotals map[string]map[string]map[string]float64
dailyTransfers := map[string]map[string]map[string]float64{}
if daily != "" {
wg.Add(1)
go func(prefix string, queryDays int) {
var err error
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
dailyTotals, err = createAmountsOfInterval(tbl, ctx, prefix, queryDays)
for date, chains := range dailyTotals {
transfers := amountsTransferredToInInterval(tbl, ctx, prefix, start)
for date, chains := range transfers {
dailyTransfers[date] = map[string]map[string]float64{}
for chain, tokens := range chains {
dailyTransfers[date][chain] = map[string]float64{}
for symbol, amount := range tokens {
dailyTotals[date][chain][symbol] = roundToTwoDecimalPlaces(amount)
dailyTransfers[date][chain][symbol] = roundToTwoDecimalPlaces(amount)
}
}
}
if err != nil {
log.Fatalf("failed getting createCountsOfInterval err %v", err)
}
}(prefix, queryDays)
}
@ -433,9 +444,9 @@ func NotionalTransferredTo(w http.ResponseWriter, r *http.Request) {
result := &amountsResult{
Last24Hours: last24HourCount,
WithinPeriod: periodCount,
WithinPeriod: periodTransfers,
PeriodDurationDays: queryDays,
Daily: dailyTotals,
Daily: dailyTransfers,
}
jsonBytes, err := json.Marshal(result)

View File

@ -2,9 +2,7 @@
package p
import (
"bytes"
"context"
"encoding/binary"
"encoding/json"
"fmt"
"io"
@ -24,81 +22,18 @@ type transfersResult struct {
Daily map[string]map[string]map[string]map[string]float64
}
// warmCache keeps some data around between invocations, so that we don't have
// to do a full table scan with each request.
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
// TODO - make a struct for cache
// an in-memory cache of previously calculated results
var warmTransfersCache = map[string]map[string]map[string]map[string]map[string]float64{}
var muWarmTransfersCache sync.RWMutex
var warmTransfersCacheFilePath = "/notional-transferred-cache.json"
func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]TransferData, error) {
rows := []TransferData{}
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
t := &TransferData{}
if _, ok := row[transferDetailsFam]; ok {
for _, item := range row[transferDetailsFam] {
switch item.Column {
case "TokenTransferDetails:Amount":
amount, _ := strconv.ParseFloat(string(item.Value), 64)
t.TokenAmount = amount
case "TokenTransferDetails:NotionalUSD":
reader := bytes.NewReader(item.Value)
var notionalFloat float64
if err := binary.Read(reader, binary.BigEndian, &notionalFloat); err != nil {
log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
}
t.Notional = notionalFloat
case "TokenTransferDetails:OriginSymbol":
t.TokenSymbol = string(item.Value)
}
}
if _, ok := row[transferPayloadFam]; ok {
for _, item := range row[transferPayloadFam] {
switch item.Column {
case "TokenTransferPayload:OriginChain":
t.OriginChain = string(item.Value)
case "TokenTransferPayload:TargetChain":
t.DestinationChain = string(item.Value)
}
}
}
t.LeavingChain = row.Key()[:1]
rows = append(rows, *t)
}
return true
}, bigtable.RowFilter(
bigtable.ConditionFilter(
bigtable.ChainFilters(
bigtable.FamilyFilter(columnFamilies[1]),
bigtable.CellsPerRowLimitFilter(1), // only the first cell in column
bigtable.TimestampRangeFilter(start, end), // within time range
bigtable.StripValueFilter(), // no columns/values, just the row.Key()
),
bigtable.ChainFilters(
bigtable.FamilyFilter(fmt.Sprintf("%v|%v", columnFamilies[2], columnFamilies[5])),
bigtable.ColumnFilter("Amount|NotionalUSD|OriginSymbol|OriginChain|TargetChain"),
bigtable.LatestNFilter(1),
),
bigtable.BlockAllFilter(),
),
))
if err != nil {
fmt.Println("failed reading rows to create RowList.", err)
return nil, err
}
return rows, err
}
func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int) (map[string]map[string]map[string]map[string]float64, error) {
var mu sync.RWMutex
// finds the daily amount of each symbol transferred from each chain, to each chain,
// from the specified start to the present.
func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]map[string]float64 {
results := map[string]map[string]map[string]map[string]float64{}
now := time.Now().UTC()
numPrevDays := int(now.Sub(start).Hours() / 24)
var intervalsWG sync.WaitGroup
// there will be a query for each previous day, plus today
@ -107,6 +42,8 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
// create the unique identifier for this query, for cache
cachePrefix := createCachePrefix(prefix)
cacheNeedsUpdate := false
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
// start is the SOD, end is EOD
@ -127,133 +64,192 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
dateStr := start.Format("2006-01-02")
mu.Lock()
muWarmTransfersCache.Lock()
// initialize the map for this date in the result set
results[dateStr] = map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
// check to see if there is cache data for this date/query
if dates, ok := warmTransfersCache[cachePrefix]; ok {
// have a cache for this query
if dateCache, ok := dates[dateStr]; ok {
if dateCache, ok := dates[dateStr]; ok && len(dateCache) > 1 {
// have a cache for this date
if daysAgo >= 1 {
// only use the cache for yesterday and older
results[dateStr] = dateCache
mu.Unlock()
muWarmTransfersCache.Unlock()
intervalsWG.Done()
return
}
} else {
// no cache for this query
warmTransfersCache[cachePrefix][dateStr] = map[string]map[string]map[string]float64{}
}
} else {
// no cache for this date, initialize the map
// no cache for this query, initialize the map
warmTransfersCache[cachePrefix] = map[string]map[string]map[string]map[string]float64{}
warmTransfersCache[cachePrefix][dateStr] = map[string]map[string]map[string]float64{}
}
mu.Unlock()
var result []TransferData
var fetchErr error
muWarmTransfersCache.Unlock()
defer intervalsWG.Done()
result, fetchErr = fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
queryResult := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Fatalf("fetchTransferRowsInInterval returned an error: %v\n", fetchErr)
}
// iterate through the rows and increment the count
for _, row := range result {
// iterate through the rows and increment the amounts
for _, row := range queryResult {
if _, ok := results[dateStr][row.LeavingChain]; !ok {
results[dateStr][row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
}
if _, ok := results[dateStr][row.LeavingChain][row.DestinationChain]; !ok {
results[dateStr][row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
}
if _, ok := results[dateStr]["*"][row.DestinationChain]; !ok {
results[dateStr]["*"][row.DestinationChain] = map[string]float64{"*": 0}
}
// add the transfer data to the result set every possible way:
// by symbol, aggregated by: "leaving chain", "arriving at chain", "from any chain", "to any chain".
// add to the total count
// add to the total amount leaving this chain, going to any chain, for all symbols
results[dateStr][row.LeavingChain]["*"]["*"] = results[dateStr][row.LeavingChain]["*"]["*"] + row.Notional
// add to the total amount leaving this chain, going to the destination chain, for all symbols
results[dateStr][row.LeavingChain][row.DestinationChain]["*"] = results[dateStr][row.LeavingChain][row.DestinationChain]["*"] + row.Notional
// add to the count for chain/symbol
// add to the total amount of this symbol leaving this chain, going to any chain
results[dateStr][row.LeavingChain]["*"][row.TokenSymbol] = results[dateStr][row.LeavingChain]["*"][row.TokenSymbol] + row.Notional
// add to the total amount of this symbol leaving this chain, going to the destination chain
results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
// add to the total amount arriving at the destination chain, coming from anywhere, including all symbols
results[dateStr]["*"][row.DestinationChain]["*"] = results[dateStr]["*"][row.DestinationChain]["*"] + row.Notional
// add to the total amount of this symbol arriving at the destination chain
results[dateStr]["*"][row.DestinationChain][row.TokenSymbol] = results[dateStr]["*"][row.DestinationChain][row.TokenSymbol] + row.Notional
// add to the total amount of this symbol transferred, from any chain, to any chain
results[dateStr]["*"]["*"][row.TokenSymbol] = results[dateStr]["*"]["*"][row.TokenSymbol] + row.Notional
// and finally, total/total/total: amount of all symbols transferred from any chain to any other chain
results[dateStr]["*"]["*"]["*"] = results[dateStr]["*"]["*"]["*"] + row.Notional
}
if daysAgo >= 1 {
// set the result in the cache
muWarmTransfersCache.Lock()
if cacheData, ok := warmTransfersCache[cachePrefix][dateStr]; !ok || len(cacheData) == 1 {
// cache does not have this date, add the data, and mark the cache stale
warmTransfersCache[cachePrefix][dateStr] = results[dateStr]
cacheNeedsUpdate = true
}
muWarmTransfersCache.Unlock()
}
// set the result in the cache
warmTransfersCache[cachePrefix][dateStr] = results[dateStr]
}(tbl, ctx, prefix, daysAgo)
}
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(warmTransfersCacheFilePath, &muWarmTransfersCache, warmTransfersCache)
}
// having consistent keys in each object is helpful for clients, explorer GUI
// create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys
seenSymbolSet := map[string]bool{}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same chain keys
seenChainSet := map[string]bool{}
for date, tokens := range results {
for leaving, dests := range tokens {
for _, chains := range results {
for leaving, dests := range chains {
seenChainSet[leaving] = true
for dest := range dests {
for key := range results[date][leaving][dest] {
seenSymbolSet[key] = true
}
seenChainSet[dest] = true
}
}
}
var muResult sync.RWMutex
// ensure each chain object has all the same symbol keys:
for date := range results {
for leaving := range results[date] {
for dest := range results[date][leaving] {
for chain := range seenChainSet {
// check that date has all the chains
if _, ok := results[date][leaving][chain]; !ok {
results[date][leaving][chain] = map[string]float64{"*": 0}
}
}
for token := range seenSymbolSet {
if _, ok := results[date][leaving][token]; !ok {
// add the missing key to the map
results[date][leaving][dest][token] = 0
}
for date, chains := range results {
for chain := range seenChainSet {
if _, ok := chains[chain]; !ok {
muResult.Lock()
results[date][chain] = map[string]map[string]float64{"*": {"*": 0}}
muResult.Unlock()
}
}
for leaving := range chains {
for chain := range seenChainSet {
// check that date has all the chains
if _, ok := chains[chain]; !ok {
muResult.Lock()
results[date][leaving][chain] = map[string]float64{"*": 0}
muResult.Unlock()
}
}
}
}
return results, nil
return results
}
// calculates the amount of each symbol that has gone from each chain, to each other chain, since the specified day.
func transferredSinceDate(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
result := map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
dailyTotals := createTransfersOfInterval(tbl, ctx, prefix, start)
for _, leaving := range dailyTotals {
for chain, dests := range leaving {
// ensure the chain exists in the result map
if _, ok := result[chain]; !ok {
result[chain] = map[string]map[string]float64{"*": {"*": 0}}
}
for dest, tokens := range dests {
if _, ok := result[chain][dest]; !ok {
result[chain][dest] = map[string]float64{"*": 0}
}
for symbol, amount := range tokens {
if _, ok := result[chain][dest][symbol]; !ok {
result[chain][dest][symbol] = 0
}
// add the amount of this symbol transferred this day to the
// amount already in the result (amount of this symbol prevoiusly transferred)
result[chain][dest][symbol] = result[chain][dest][symbol] + amount
}
}
}
}
return result
}
// returns the count of the rows in the query response
func transfersForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) (map[string]map[string]map[string]float64, error) {
func transfersForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) map[string]map[string]map[string]float64 {
// query for all rows in time range, return result count
results, fetchErr := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
return nil, fetchErr
}
var total = float64(0)
for _, item := range results {
total = total + item.Notional
}
queryResults := fetchTransferRowsInInterval(tbl, ctx, prefix, start, end)
result := map[string]map[string]map[string]float64{"*": {"*": {"*": total}}}
result := map[string]map[string]map[string]float64{"*": {"*": {"*": 0}}}
// iterate through the rows and increment the count for each index
for _, row := range results {
for _, row := range queryResults {
if _, ok := result[row.LeavingChain]; !ok {
result[row.LeavingChain] = map[string]map[string]float64{"*": {"*": 0}}
}
if _, ok := result[row.LeavingChain][row.DestinationChain]; !ok {
result[row.LeavingChain][row.DestinationChain] = map[string]float64{"*": 0}
}
// add to total amount
if _, ok := result["*"][row.DestinationChain]; !ok {
result["*"][row.DestinationChain] = map[string]float64{"*": 0}
}
// add the transfer data to the result set every possible way:
// by symbol, aggregated by: "leaving chain", "arriving at chain", "from any chain", "to any chain".
// add to the total amount leaving this chain, going to any chain, for all symbols
result[row.LeavingChain]["*"]["*"] = result[row.LeavingChain]["*"]["*"] + row.Notional
// add to the total amount leaving this chain, going to the destination chain, for all symbols
result[row.LeavingChain][row.DestinationChain]["*"] = result[row.LeavingChain][row.DestinationChain]["*"] + row.Notional
// add to symbol amount
// add to the total amount of this symbol leaving this chain, going to any chain
result[row.LeavingChain]["*"][row.TokenSymbol] = result[row.LeavingChain]["*"][row.TokenSymbol] + row.Notional
// add to the total amount of this symbol leaving this chain, going to the destination chain
result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] = result[row.LeavingChain][row.DestinationChain][row.TokenSymbol] + row.Notional
// add to the total amount arriving at the destination chain, coming from anywhere, including all symbols
result["*"][row.DestinationChain]["*"] = result["*"][row.DestinationChain]["*"] + row.Notional
// add to the total amount of this symbol arriving at the destination chain
result["*"][row.DestinationChain][row.TokenSymbol] = result["*"][row.DestinationChain][row.TokenSymbol] + row.Notional
// add to the total amount of this symbol transferred, from any chain, to any chain
result["*"]["*"][row.TokenSymbol] = result["*"]["*"][row.TokenSymbol] + row.Notional
// and finally, total/total/total: amount of all symbols transferred from any chain to any other chain
result["*"]["*"]["*"] = result["*"]["*"]["*"] + row.Notional
}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same keys.
@ -270,12 +266,10 @@ func transfersForInterval(tbl *bigtable.Table, ctx context.Context, prefix strin
}
}
}
return result, nil
return result
}
// get number of recent transactions in the last 24 hours, and daily for a period
// optionally group by a EmitterChain or EmitterAddress
// optionally query for recent rows of a given EmitterChain or EmitterAddress
// finds the value that has been transferred from each chain to each other, by symbol.
func NotionalTransferred(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
@ -373,35 +367,33 @@ func NotionalTransferred(w http.ResponseWriter, r *http.Request) {
var wg sync.WaitGroup
// total of last 24 hours
var last24HourCount map[string]map[string]map[string]float64
last24HourCount := map[string]map[string]map[string]float64{}
if last24Hours != "" {
wg.Add(1)
go func(prefix string) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourCount, err = transfersForInterval(tbl, ctx, prefix, start, now)
for chain, dests := range last24HourCount {
transfers := transfersForInterval(tbl, ctx, prefix, start, now)
for chain, dests := range transfers {
last24HourCount[chain] = map[string]map[string]float64{}
for dest, tokens := range dests {
last24HourCount[chain][dest] = map[string]float64{}
for symbol, amount := range tokens {
last24HourCount[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
}
}
}
if err != nil {
log.Printf("failed getting count for 24h interval, err: %v", err)
}
}(prefix)
}
// total of the last numDays
var periodCount map[string]map[string]map[string]float64
// transfers of the last numDays
periodTransfers := map[string]map[string]map[string]float64{}
if forPeriod != "" {
wg.Add(1)
go func(prefix string) {
var err error
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
@ -410,40 +402,43 @@ func NotionalTransferred(w http.ResponseWriter, r *http.Request) {
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
periodCount, err = transfersForInterval(tbl, ctx, prefix, start, now)
for chain, dests := range periodCount {
transfers := transferredSinceDate(tbl, ctx, prefix, start)
for chain, dests := range transfers {
periodTransfers[chain] = map[string]map[string]float64{}
for dest, tokens := range dests {
periodTransfers[chain][dest] = map[string]float64{}
for symbol, amount := range tokens {
periodCount[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
periodTransfers[chain][dest][symbol] = roundToTwoDecimalPlaces(amount)
}
}
}
if err != nil {
log.Printf("failed getting count for numDays interval, err: %v\n", err)
}
}(prefix)
}
// daily totals
var dailyTotals map[string]map[string]map[string]map[string]float64
dailyTransfers := map[string]map[string]map[string]map[string]float64{}
if daily != "" {
wg.Add(1)
go func(prefix string, queryDays int) {
var err error
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
dailyTotals, err = createTransfersOfInterval(tbl, ctx, prefix, queryDays)
for date, chains := range dailyTotals {
transfers := createTransfersOfInterval(tbl, ctx, prefix, start)
for date, chains := range transfers {
dailyTransfers[date] = map[string]map[string]map[string]float64{}
for chain, dests := range chains {
dailyTransfers[date][chain] = map[string]map[string]float64{}
for destChain, tokens := range dests {
dailyTransfers[date][chain][destChain] = map[string]float64{}
for symbol, amount := range tokens {
dailyTotals[date][chain][destChain][symbol] = roundToTwoDecimalPlaces(amount)
dailyTransfers[date][chain][destChain][symbol] = roundToTwoDecimalPlaces(amount)
}
}
}
}
if err != nil {
log.Fatalf("failed getting createCountsOfInterval err %v", err)
}
}(prefix, queryDays)
}
@ -451,9 +446,9 @@ func NotionalTransferred(w http.ResponseWriter, r *http.Request) {
result := &transfersResult{
Last24Hours: last24HourCount,
WithinPeriod: periodCount,
WithinPeriod: periodTransfers,
PeriodDurationDays: queryDays,
Daily: dailyTotals,
Daily: dailyTransfers,
}
jsonBytes, err := json.Marshal(result)

View File

@ -2,12 +2,14 @@ package p
import (
"context"
"encoding/json"
"log"
"math"
"net/http"
"os"
"strings"
"sync"
"time"
"cloud.google.com/go/bigtable"
"cloud.google.com/go/pubsub"
@ -28,6 +30,14 @@ var pubSubTokenTransferDetailsTopic *pubsub.Topic
var coinGeckoCoins = map[string][]CoinGeckoCoin{}
var solanaTokens = map[string]SolanaToken{}
var releaseDay = time.Date(2021, 9, 13, 0, 0, 0, 0, time.UTC)
var pwd string
func initCache(waitgroup *sync.WaitGroup, filePath string, mutex *sync.RWMutex, cacheInterface interface{}) {
defer waitgroup.Done()
loadJsonToInterface(filePath, mutex, cacheInterface)
}
// init runs during cloud function initialization. So, this will only run during an
// an instance's cold start.
// https://cloud.google.com/functions/docs/bestpractices/networking#accessing_google_apis
@ -63,6 +73,99 @@ func init() {
solanaTokens = fetchSolanaTokenList()
}
pwd, _ = os.Getwd()
// initialize in-memory caches
var initWG sync.WaitGroup
initWG.Add(1)
// populates cache used by amountsTransferredToInInterval
go initCache(&initWG, warmTransfersToCacheFilePath, &muWarmTransfersToCache, &warmTransfersToCache)
initWG.Add(1)
// populates cache used by createTransfersOfInterval
go initCache(&initWG, warmTransfersCacheFilePath, &muWarmTransfersCache, &warmTransfersCache)
initWG.Add(1)
// populates cache used by createAddressesOfInterval
go initCache(&initWG, warmAddressesCacheFilePath, &muWarmAddressesCache, &warmAddressesCache)
initWG.Add(1)
// populates cache used by transferredToSince
go initCache(&initWG, transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, &transferredToUpToYesterday)
// initWG.Add(1)
// populates cache used by transferredSince
// initCache(initWG, transferredUpToYesterdayFilePath, &muTransferredToUpYesterday, &transferredUpToYesterday)
initWG.Add(1)
// populates cache used by addressesTransferredToSince
go initCache(&initWG, addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, &addressesToUpToYesterday)
initWG.Add(1)
// populates cache used by createCumulativeAmountsOfInterval
go initCache(&initWG, warmCumulativeCacheFilePath, &muWarmCumulativeCache, &warmCumulativeCache)
initWG.Add(1)
// populates cache used by createCumulativeAddressesOfInterval
go initCache(&initWG, warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, &warmCumulativeAddressesCache)
initWG.Wait()
log.Println("done initializing caches, starting.")
}
var gcpCachePath = "/workspace/src/p/cache"
func loadJsonToInterface(filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
// create path to the static cache dir
path := gcpCachePath + filePath
// create path to the "hot" cache dir
hotPath := "/tmp" + filePath
if strings.HasSuffix(pwd, "cmd") {
// alter the path to be correct when running locally, and in Tilt devnet
path = "../cache" + filePath
hotPath = ".." + hotPath
}
mutex.Lock()
// first check to see if there is a cache file in the tmp dir of the cloud function.
// if so, this is a long running instance with a recently generated cache available.
fileData, readErrTmp := os.ReadFile(hotPath)
if readErrTmp != nil {
log.Printf("failed reading from tmp cache %v, err: %v", hotPath, readErrTmp)
var readErr error
fileData, readErr = os.ReadFile(path)
if readErr != nil {
log.Printf("failed reading %v, err: %v", path, readErr)
} else {
log.Printf("successfully read from cache: %v", path)
}
} else {
log.Printf("successfully read from tmp cache: %v", hotPath)
}
unmarshalErr := json.Unmarshal(fileData, &cacheMap)
mutex.Unlock()
if unmarshalErr != nil {
log.Printf("failed unmarshaling %v, err: %v", path, unmarshalErr)
}
}
func persistInterfaceToJson(filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
path := "/tmp" + filePath
if strings.HasSuffix(pwd, "cmd") {
// alter the path to be correct when running locally, and in Tilt devnet
path = "../cache" + filePath
}
mutex.Lock()
cacheBytes, marshalErr := json.MarshalIndent(cacheMap, "", " ")
if marshalErr != nil {
log.Fatal("failed marshaling cacheMap.", marshalErr)
}
writeErr := os.WriteFile(path, cacheBytes, 0666)
mutex.Unlock()
if writeErr != nil {
log.Fatalf("failed writing to file %v, err: %v", path, writeErr)
}
log.Printf("successfully wrote cache to file: %v", path)
}
var columnFamilies = []string{
@ -371,6 +474,9 @@ func newMux() *http.ServeMux {
mux.HandleFunc("/notionaltransferred", NotionalTransferred)
mux.HandleFunc("/notionaltransferredto", NotionalTransferredTo)
mux.HandleFunc("/notionaltransferredtocumulative", NotionalTransferredToCumulative)
mux.HandleFunc("/addressestransferredto", AddressesTransferredTo)
mux.HandleFunc("/addressestransferredtocumulative", AddressesTransferredToCumulative)
mux.HandleFunc("/totals", Totals)
mux.HandleFunc("/recent", Recent)
mux.HandleFunc("/transaction", Transaction)

View File

@ -298,7 +298,7 @@ func Totals(w http.ResponseWriter, r *http.Request) {
keySegments = 2
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup