From 90f7015b934ecbf6acc9d44827828b698b80c774 Mon Sep 17 00:00:00 2001 From: justinschuldt Date: Fri, 29 Oct 2021 03:50:46 -0500 Subject: [PATCH] short lived cache for /recent function Change-Id: I762791b352161dc20b14739e492a3dee86a0f047 --- event_database/cloud_functions/recent.go | 55 +++++++++++++++++++++--- 1 file changed, 49 insertions(+), 6 deletions(-) diff --git a/event_database/cloud_functions/recent.go b/event_database/cloud_functions/recent.go index 4aeb70203..de2249ada 100644 --- a/event_database/cloud_functions/recent.go +++ b/event_database/cloud_functions/recent.go @@ -12,27 +12,70 @@ import ( "sort" "strconv" "strings" + "time" "cloud.google.com/go/bigtable" ) +// warmCache keeps some data around between invocations, so that we don't have +// to do a full table scan with each request. +// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations +var warmCache = map[string]map[string]string{} +var lastCacheReset = time.Now() + // query for last of each rowKey prefix func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int) map[string]string { + // get cache data for query + cachePrefix := prefix + if prefix == "" { + cachePrefix = "*" + } + + var rowSet bigtable.RowSet + rowSet = bigtable.PrefixRange(prefix) + now := time.Now() + oneHourAgo := now.Add(-time.Duration(1) * time.Hour) + if oneHourAgo.Before(lastCacheReset) { + // cache is less than one hour old, use it + if cached, ok := warmCache[cachePrefix]; ok { + // use the highest possible sequence number as the range end. + maxSeq := "9999999999999999" + rowSets := bigtable.RowRangeList{} + for k, v := range cached { + start := fmt.Sprintf("%v:%v", k, v) + end := fmt.Sprintf("%v:%v", k, maxSeq) + rowSets = append(rowSets, bigtable.NewRange(start, end)) + } + if len(rowSets) >= 1 { + rowSet = rowSets + } + } + } else { + // cache is more than hour old, don't use it, reset it + warmCache = map[string]map[string]string{} + lastCacheReset = now + } mostRecentByKeySegment := map[string]string{} - err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool { + err := tbl.ReadRows(ctx, rowSet, func(row bigtable.Row) bool { keyParts := strings.Split(row.Key(), ":") groupByKey := strings.Join(keyParts[:2], ":") - mostRecentByKeySegment[groupByKey] = row.Key() + mostRecentByKeySegment[groupByKey] = keyParts[2] return true // TODO - add filter to only return rows created within the last 30(?) days - }, bigtable.RowFilter(bigtable.StripValueFilter())) + }, bigtable.RowFilter( + bigtable.ChainFilters( + bigtable.CellsPerRowLimitFilter(1), + bigtable.StripValueFilter(), + ))) if err != nil { log.Fatalf("failed to read recent rows: %v", err) } + // update the cache with the latest rows + warmCache[cachePrefix] = mostRecentByKeySegment return mostRecentByKeySegment } @@ -44,10 +87,10 @@ func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string // key/value pairs are the start/stop rowKeys for range queries rangePairs := map[string]string{} - for _, highestSequenceKey := range latest { - rowKeyParts := strings.Split(highestSequenceKey, ":") + for prefixGroup, highestSequence := range latest { + rowKeyParts := strings.Split(prefixGroup, ":") // convert the sequence part of the rowkey from a string to an int, so it can be used for math - highSequence, _ := strconv.Atoi(rowKeyParts[2]) + highSequence, _ := strconv.Atoi(highestSequence) lowSequence := highSequence - numRowsToFetch // create a rowKey to use as the start of the range query rangeQueryStart := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], lowSequence)