wormhole/event_database/cloud_functions/totals.go

382 lines
11 KiB
Go

// Package p contains an HTTP Cloud Function.
package p
import (
"context"
"encoding/json"
"fmt"
"html"
"io"
"log"
"net/http"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/bigtable"
)
const maxNano int = 999999999
type totalsResult struct {
LastDayCount map[string]int
TotalCount map[string]int
TotalCountDurationDays int
DailyTotals map[string]map[string]int
}
// warmCache keeps some data around between invocations, so that we don't have
// to do a full table scan with each request.
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
var warmTotalsCache = map[string]map[string]map[string]int{}
var muWarmTotalsCache sync.RWMutex
var warmTotalsCacheFilePath = "totals-cache.json"
// derive the result index relevant to a row.
func makeGroupKey(keySegments int, rowKey string) string {
var countBy string
if keySegments == 0 {
countBy = "*"
} else {
keyParts := strings.Split(rowKey, ":")
countBy = strings.Join(keyParts[:keySegments], ":")
}
return countBy
}
func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]bigtable.Row, error) {
rows := []bigtable.Row{}
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
rows = append(rows, row)
return true
}, bigtable.RowFilter(
bigtable.ChainFilters(
// combine filters to get only what we need:
bigtable.FamilyFilter(columnFamilies[1]),
bigtable.CellsPerRowLimitFilter(1), // only the first cell in each column (helps for devnet where sequence resets)
bigtable.TimestampRangeFilter(start, end), // within time range
bigtable.StripValueFilter(), // no columns/values, just the row.Key()
)))
return rows, err
}
func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
if _, ok := warmTotalsCache["2021-09-13"]; !ok && loadCache {
loadJsonToInterface(ctx, warmTotalsCacheFilePath, &muWarmTotalsCache, &warmTotalsCache)
}
results := map[string]map[string]int{}
now := time.Now().UTC()
var intervalsWG sync.WaitGroup
// there will be a query for each previous day, plus today
intervalsWG.Add(numPrevDays + 1)
// create the unique identifier for this query, for cache
cachePrefix := prefix
if prefix == "" {
cachePrefix = "*"
}
cachePrefix = fmt.Sprintf("%v-%v", cachePrefix, keySegments)
cacheNeedsUpdate := false
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
// start is the SOD, end is EOD
// "0 daysAgo start" is 00:00:00 AM of the current day
// "0 daysAgo end" is 23:59:59 of the current day (the future)
// calulate the start and end times for the query
hoursAgo := (24 * daysAgo)
daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
n := now.Add(daysAgoDuration)
year := n.Year()
month := n.Month()
day := n.Day()
loc := n.Location()
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
dateStr := start.Format("2006-01-02")
muWarmTotalsCache.Lock()
// initialize the map for this date in the result set
results[dateStr] = map[string]int{"*": 0}
// check to see if there is cache data for this date/query
if dateCache, ok := warmTotalsCache[dateStr]; ok && useCache(dateStr) {
// have a cache for this date
if val, ok := dateCache[cachePrefix]; ok {
// have a cache for this query
if daysAgo >= 1 {
// only use the cache for yesterday and older
results[dateStr] = val
muWarmTotalsCache.Unlock()
intervalsWG.Done()
return
}
}
}
muWarmTotalsCache.Unlock()
var result []bigtable.Row
var fetchErr error
defer intervalsWG.Done()
result, fetchErr = fetchRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Fatalf("fetchRowsInInterval returned an error: %v", fetchErr)
}
// iterate through the rows and increment the count
for _, row := range result {
countBy := makeGroupKey(keySegments, row.Key())
if keySegments != 0 {
// increment the total count
results[dateStr]["*"] = results[dateStr]["*"] + 1
}
results[dateStr][countBy] = results[dateStr][countBy] + 1
}
if daysAgo >= 1 {
muWarmTotalsCache.Lock()
if _, ok := warmTotalsCache[dateStr]; !ok {
warmTotalsCache[dateStr] = map[string]map[string]int{}
}
if _, ok := warmTotalsCache[dateStr][cachePrefix]; !ok {
warmTotalsCache[dateStr][cachePrefix] = map[string]int{}
}
if len(warmTotalsCache[dateStr][cachePrefix]) <= 1 || !useCache(dateStr) {
// set the result in the cache
warmTotalsCache[dateStr][cachePrefix] = results[dateStr]
cacheNeedsUpdate = true
}
muWarmTotalsCache.Unlock()
}
}(tbl, ctx, prefix, daysAgo)
}
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(ctx, warmTotalsCacheFilePath, &muWarmTotalsCache, warmTotalsCache)
}
// create a set of all the keys from all dates, to ensure the result objects all have the same keys
seenKeySet := map[string]bool{}
for _, v := range results {
for key := range v {
seenKeySet[key] = true
}
}
// ensure each date object has the same keys:
for date := range results {
for key := range seenKeySet {
if _, ok := results[date][key]; !ok {
// add the missing key to the map
results[date][key] = 0
}
}
}
return results, nil
}
// returns the count of the rows in the query response
func messageCountForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time, keySegments int) (map[string]int, error) {
// query for all rows in time range, return result count
results, fetchErr := fetchRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
return nil, fetchErr
}
result := map[string]int{"*": len(results)}
// iterate through the rows and increment the count for each index
if keySegments != 0 {
for _, row := range results {
countBy := makeGroupKey(keySegments, row.Key())
result[countBy] = result[countBy] + 1
}
}
return result, nil
}
// get number of recent transactions in the last 24 hours, and daily for a period
// optionally group by a EmitterChain or EmitterAddress
// optionally query for recent rows of a given EmitterChain or EmitterAddress
func Totals(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var last24Hours, numDays, groupBy, forChain, forAddress string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
last24Hours = queryParams.Get("last24Hours")
numDays = queryParams.Get("numDays")
groupBy = queryParams.Get("groupBy")
forChain = queryParams.Get("forChain")
forAddress = queryParams.Get("forAddress")
readyCheck := queryParams.Get("readyCheck")
if readyCheck != "" {
// for running in devnet
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, html.EscapeString("ready"))
return
}
case http.MethodPost:
// declare request body properties
var d struct {
Last24Hours string `json:"last24Hours"`
NumDays string `json:"numDays"`
GroupBy string `json:"groupBy"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
last24Hours = d.Last24Hours
numDays = d.NumDays
groupBy = d.GroupBy
forChain = d.ForChain
forAddress = d.ForAddress
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
// default query period is all time
queryDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
// if the request included numDays, set the query period to that
if numDays != "" {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
fmt.Fprint(w, "numDays must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
// create the rowkey prefix for querying
prefix := ""
if forChain != "" {
prefix = forChain
if groupBy == "" {
// if the request is forChain, and groupBy is empty, set it to groupBy chain
groupBy = "chain"
}
if forAddress != "" {
// if the request is forAddress, always groupBy address
groupBy = "address"
prefix = forChain + ":" + forAddress
}
}
// use the groupBy value to determine how many segements of the rowkey should be used.
keySegments := 0
if groupBy == "chain" {
keySegments = 1
}
if groupBy == "address" {
keySegments = 2
}
ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second)
defer cancel()
var wg sync.WaitGroup
// total of last 24 hours
var last24HourCount map[string]int
if last24Hours != "" {
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourCount, err = messageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
if err != nil {
log.Printf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
}
// daily totals
periodTotals := map[string]int{}
var dailyTotals map[string]map[string]int
wg.Add(1)
go func(prefix string, keySegments int, queryDays int) {
var err error
defer wg.Done()
dailyTotals, err = createCountsOfInterval(tbl, ctx, prefix, queryDays, keySegments)
if err != nil {
log.Fatalf("failed getting createCountsOfInterval err %v", err)
}
// sum all the days to create a map with totals for the query period
for _, vals := range dailyTotals {
for chain, amount := range vals {
periodTotals[chain] += amount
}
}
}(prefix, keySegments, queryDays)
wg.Wait()
result := &totalsResult{
LastDayCount: last24HourCount,
TotalCount: periodTotals,
TotalCountDurationDays: queryDays,
DailyTotals: dailyTotals,
}
jsonBytes, err := json.Marshal(result)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Println(err.Error())
return
}
w.WriteHeader(http.StatusOK)
w.Write(jsonBytes)
}