From f4b4a5378ee67f97817afa87b67b8bbd8e28ef81 Mon Sep 17 00:00:00 2001 From: justinschuldt Date: Fri, 19 Nov 2021 16:14:59 -0600 Subject: [PATCH] BigTable endpoint: notional transfered to chain Change-Id: I588b178429e8a18f65a564569a3c3a2e43beb35c commit-id:dc6d4478 --- .../cloud_functions/notional-transfered-to.go | 421 ++++++++++++++++++ event_database/cloud_functions/shared.go | 1 + 2 files changed, 422 insertions(+) create mode 100644 event_database/cloud_functions/notional-transfered-to.go diff --git a/event_database/cloud_functions/notional-transfered-to.go b/event_database/cloud_functions/notional-transfered-to.go new file mode 100644 index 00000000..5cb07d61 --- /dev/null +++ b/event_database/cloud_functions/notional-transfered-to.go @@ -0,0 +1,421 @@ +// Package p contains an HTTP Cloud Function. +package p + +import ( + "bytes" + "context" + "encoding/binary" + "encoding/json" + "fmt" + "io" + "log" + "math" + "net/http" + "strconv" + "sync" + "time" + + "cloud.google.com/go/bigtable" +) + +type amountsResult struct { + LastDayCount map[string]map[string]float64 + TotalCount map[string]map[string]float64 + DailyTotals map[string]map[string]map[string]float64 +} + +// warmCache keeps some data around between invocations, so that we don't have +// to do a full table scan with each request. +// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations +// TODO - make a struct for cache +var warmAmountsCache = map[string]map[string]map[string]map[string]float64{} + +type TransferData struct { + TokenSymbol string + TokenName string + TokenAmount float64 + OriginChain string + LeavingChain string + DestinationChain string + Notional float64 +} + +func roundToTwoDecimalPlaces(num float64) float64 { + return math.Round(num*100) / 100 +} + +func fetchAmountRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]TransferData, error) { + rows := []TransferData{} + err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool { + + t := &TransferData{} + if _, ok := row[transferDetailsFam]; ok { + for _, item := range row[transferDetailsFam] { + switch item.Column { + case "TokenTransferDetails:Amount": + amount, _ := strconv.ParseFloat(string(item.Value), 64) + t.TokenAmount = amount + case "TokenTransferDetails:NotionalUSD": + reader := bytes.NewReader(item.Value) + var notionalFloat float64 + if err := binary.Read(reader, binary.BigEndian, ¬ionalFloat); err != nil { + log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err) + } + t.Notional = notionalFloat + case "TokenTransferDetails:OriginSymbol": + t.TokenSymbol = string(item.Value) + case "TokenTransferDetails:OriginName": + t.TokenName = string(item.Value) + } + } + + if _, ok := row[transferPayloadFam]; ok { + for _, item := range row[transferPayloadFam] { + switch item.Column { + case "TokenTransferPayload:OriginChain": + t.OriginChain = string(item.Value) + case "TokenTransferPayload:TargetChain": + t.DestinationChain = string(item.Value) + } + } + } + + t.LeavingChain = row.Key()[:1] + + rows = append(rows, *t) + } + + return true + + }, bigtable.RowFilter( + bigtable.ConditionFilter( + bigtable.ChainFilters( + bigtable.FamilyFilter(columnFamilies[1]), + bigtable.CellsPerRowLimitFilter(1), // only the first cell in column + bigtable.TimestampRangeFilter(start, end), // within time range + bigtable.StripValueFilter(), // no columns/values, just the row.Key() + ), + bigtable.ChainFilters( + bigtable.PassAllFilter(), + bigtable.LatestNFilter(1), + ), + bigtable.BlockAllFilter(), + ), + )) + if err != nil { + fmt.Println("failed reading rows to create RowList.", err) + return nil, err + } + return rows, err +} + +func createAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int) (map[string]map[string]map[string]float64, error) { + var mu sync.RWMutex + results := map[string]map[string]map[string]float64{} + + now := time.Now().UTC() + + var intervalsWG sync.WaitGroup + // there will be a query for each previous day, plus today + intervalsWG.Add(numPrevDays + 1) + + // create the unique identifier for this query, for cache + cachePrefix := prefix + if prefix == "" { + cachePrefix = "*" + } + + for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ { + go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) { + // start is the SOD, end is EOD + // "0 daysAgo start" is 00:00:00 AM of the current day + // "0 daysAgo end" is 23:59:59 of the current day (the future) + + // calulate the start and end times for the query + hoursAgo := (24 * daysAgo) + daysAgoDuration := -time.Duration(hoursAgo) * time.Hour + n := now.Add(daysAgoDuration) + year := n.Year() + month := n.Month() + day := n.Day() + loc := n.Location() + + start := time.Date(year, month, day, 0, 0, 0, 0, loc) + end := time.Date(year, month, day, 23, 59, 59, maxNano, loc) + + dateStr := start.Format("2006-01-02") + + mu.Lock() + // initialize the map for this date in the result set + results[dateStr] = map[string]map[string]float64{} + // check to see if there is cache data for this date/query + if dateCache, ok := warmAmountsCache[dateStr]; ok { + // have a cache for this date + + if val, ok := dateCache[cachePrefix]; ok { + // have a cache for this query + if daysAgo >= 1 { + // only use the cache for yesterday and older + results[dateStr] = val + mu.Unlock() + intervalsWG.Done() + return + } + } else { + // no cache for this query + warmAmountsCache[dateStr][cachePrefix] = map[string]map[string]float64{} + } + } else { + // no cache for this date, initialize the map + warmAmountsCache[dateStr] = map[string]map[string]map[string]float64{} + warmAmountsCache[dateStr][cachePrefix] = map[string]map[string]float64{} + } + mu.Unlock() + + var result []TransferData + var fetchErr error + + defer intervalsWG.Done() + + result, fetchErr = fetchAmountRowsInInterval(tbl, ctx, prefix, start, end) + + if fetchErr != nil { + log.Fatalf("fetchAmountRowsInInterval returned an error: %v\n", fetchErr) + } + + // iterate through the rows and increment the count + for _, row := range result { + if _, ok := results[dateStr][row.DestinationChain]; !ok { + results[dateStr][row.DestinationChain] = map[string]float64{"*": 0} + } + // add to the total count + results[dateStr][row.DestinationChain]["*"] = results[dateStr][row.DestinationChain]["*"] + row.Notional + // add to the count for chain/symbol + results[dateStr][row.DestinationChain][row.TokenSymbol] = results[dateStr][row.DestinationChain][row.TokenSymbol] + row.Notional + + } + // set the result in the cache + warmAmountsCache[dateStr][cachePrefix] = results[dateStr] + }(tbl, ctx, prefix, daysAgo) + } + + intervalsWG.Wait() + + // create a set of all the keys from all dates/chains/symbols, to ensure the result objects all have the same keys + seenKeySet := map[string]bool{} + for date, tokens := range results { + for leaving, _ := range tokens { + for key := range results[date][leaving] { + seenKeySet[key] = true + } + } + } + // ensure each chain object has all the same symbol keys: + for date := range results { + for leaving := range results[date] { + for token := range seenKeySet { + if _, ok := results[date][leaving][token]; !ok { + // add the missing key to the map + results[date][leaving][token] = 0 + } + } + } + } + + return results, nil +} + +// returns the count of the rows in the query response +func amountsForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) (map[string]map[string]float64, error) { + // query for all rows in time range, return result count + results, fetchErr := fetchAmountRowsInInterval(tbl, ctx, prefix, start, end) + if fetchErr != nil { + log.Printf("fetchRowsInInterval returned an error: %v", fetchErr) + return nil, fetchErr + } + var total = float64(0) + for _, item := range results { + total = total + item.Notional + } + + result := map[string]map[string]float64{"*": {"*": total}} + + // iterate through the rows and increment the count for each index + for _, row := range results { + if _, ok := result[row.DestinationChain]; !ok { + result[row.DestinationChain] = map[string]float64{"*": 0} + } + // add to total amount + result[row.DestinationChain]["*"] = result[row.DestinationChain]["*"] + row.Notional + // add to symbol amount + result[row.DestinationChain][row.TokenSymbol] = result[row.DestinationChain][row.TokenSymbol] + row.Notional + } + return result, nil +} + +// get number of recent transactions in the last 24 hours, and daily for a period +// optionally group by a EmitterChain or EmitterAddress +// optionally query for recent rows of a given EmitterChain or EmitterAddress +func NotionalTransferedTo(w http.ResponseWriter, r *http.Request) { + // Set CORS headers for the preflight request + if r.Method == http.MethodOptions { + w.Header().Set("Access-Control-Allow-Origin", "*") + w.Header().Set("Access-Control-Allow-Methods", "POST") + w.Header().Set("Access-Control-Allow-Headers", "Content-Type") + w.Header().Set("Access-Control-Max-Age", "3600") + w.WriteHeader(http.StatusNoContent) + return + } + // Set CORS headers for the main request. + w.Header().Set("Access-Control-Allow-Origin", "*") + + var numDays, forChain, forAddress string + + // allow GET requests with querystring params, or POST requests with json body. + switch r.Method { + case http.MethodGet: + queryParams := r.URL.Query() + numDays = queryParams.Get("numDays") + forChain = queryParams.Get("forChain") + forAddress = queryParams.Get("forAddress") + + case http.MethodPost: + // declare request body properties + var d struct { + NumDays string `json:"numDays"` + ForChain string `json:"forChain"` + ForAddress string `json:"forAddress"` + } + + // deserialize request body + if err := json.NewDecoder(r.Body).Decode(&d); err != nil { + switch err { + case io.EOF: + // do nothing, empty body is ok + default: + log.Printf("json.NewDecoder: %v", err) + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + } + + numDays = d.NumDays + forChain = d.ForChain + forAddress = d.ForAddress + + default: + http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed) + log.Println("Method Not Allowed") + return + } + + var queryDays int + if numDays == "" { + queryDays = 30 + } else { + var convErr error + queryDays, convErr = strconv.Atoi(numDays) + if convErr != nil { + fmt.Fprint(w, "numDays must be an integer") + http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest) + return + } + } + + // create the rowkey prefix for querying + prefix := "" + if forChain != "" { + prefix = forChain + // if the request is forChain, always groupBy chain + if forAddress != "" { + // if the request is forAddress, always groupBy address + prefix = forChain + ":" + forAddress + } + } + + ctx, cancel := context.WithTimeout(context.Background(), 60*time.Second) + defer cancel() + + var wg sync.WaitGroup + + // total of last 24 hours + var last24HourCount map[string]map[string]float64 + wg.Add(1) + go func(prefix string) { + var err error + last24HourInterval := -time.Duration(24) * time.Hour + now := time.Now().UTC() + start := now.Add(last24HourInterval) + defer wg.Done() + last24HourCount, err = amountsForInterval(tbl, ctx, prefix, start, now) + for chain, tokens := range last24HourCount { + for symbol, amount := range tokens { + last24HourCount[chain][symbol] = roundToTwoDecimalPlaces(amount) + } + } + if err != nil { + log.Printf("failed getting count for 24h interval, err: %v", err) + } + }(prefix) + + // total of the last numDays + var periodCount map[string]map[string]float64 + wg.Add(1) + go func(prefix string) { + var err error + hours := (24 * queryDays) + periodInterval := -time.Duration(hours) * time.Hour + + now := time.Now().UTC() + prev := now.Add(periodInterval) + start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location()) + + defer wg.Done() + periodCount, err = amountsForInterval(tbl, ctx, prefix, start, now) + for chain, tokens := range periodCount { + for symbol, amount := range tokens { + periodCount[chain][symbol] = roundToTwoDecimalPlaces(amount) + } + } + if err != nil { + log.Printf("failed getting count for numDays interval, err: %v\n", err) + } + }(prefix) + + // daily totals + var dailyTotals map[string]map[string]map[string]float64 + wg.Add(1) + go func(prefix string, queryDays int) { + var err error + defer wg.Done() + dailyTotals, err = createAmountsOfInterval(tbl, ctx, prefix, queryDays) + for date, chains := range dailyTotals { + for chain, tokens := range chains { + for symbol, amount := range tokens { + dailyTotals[date][chain][symbol] = roundToTwoDecimalPlaces(amount) + } + } + } + if err != nil { + log.Fatalf("failed getting createCountsOfInterval err %v", err) + } + }(prefix, queryDays) + + wg.Wait() + + result := &amountsResult{ + LastDayCount: last24HourCount, + TotalCount: periodCount, + DailyTotals: dailyTotals, + } + + jsonBytes, err := json.Marshal(result) + if err != nil { + w.WriteHeader(http.StatusInternalServerError) + w.Write([]byte(err.Error())) + log.Println(err.Error()) + return + } + w.WriteHeader(http.StatusOK) + w.Write(jsonBytes) +} diff --git a/event_database/cloud_functions/shared.go b/event_database/cloud_functions/shared.go index a960e751..1181f10f 100644 --- a/event_database/cloud_functions/shared.go +++ b/event_database/cloud_functions/shared.go @@ -347,6 +347,7 @@ func Entry(w http.ResponseWriter, r *http.Request) { func newMux() *http.ServeMux { mux := http.NewServeMux() + mux.HandleFunc("/notionaltransferedto", NotionalTransferedTo) mux.HandleFunc("/totals", Totals) mux.HandleFunc("/recent", Recent) mux.HandleFunc("/transaction", Transaction)