short lived cache for /recent function
Change-Id: I762791b352161dc20b14739e492a3dee86a0f047
This commit is contained in:
parent
aae5d7450f
commit
90f7015b93
|
@ -12,27 +12,70 @@ import (
|
||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"time"
|
||||||
|
|
||||||
"cloud.google.com/go/bigtable"
|
"cloud.google.com/go/bigtable"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// warmCache keeps some data around between invocations, so that we don't have
|
||||||
|
// to do a full table scan with each request.
|
||||||
|
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
|
||||||
|
var warmCache = map[string]map[string]string{}
|
||||||
|
var lastCacheReset = time.Now()
|
||||||
|
|
||||||
// query for last of each rowKey prefix
|
// query for last of each rowKey prefix
|
||||||
func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int) map[string]string {
|
func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int) map[string]string {
|
||||||
|
// get cache data for query
|
||||||
|
cachePrefix := prefix
|
||||||
|
if prefix == "" {
|
||||||
|
cachePrefix = "*"
|
||||||
|
}
|
||||||
|
|
||||||
|
var rowSet bigtable.RowSet
|
||||||
|
rowSet = bigtable.PrefixRange(prefix)
|
||||||
|
now := time.Now()
|
||||||
|
oneHourAgo := now.Add(-time.Duration(1) * time.Hour)
|
||||||
|
if oneHourAgo.Before(lastCacheReset) {
|
||||||
|
// cache is less than one hour old, use it
|
||||||
|
if cached, ok := warmCache[cachePrefix]; ok {
|
||||||
|
// use the highest possible sequence number as the range end.
|
||||||
|
maxSeq := "9999999999999999"
|
||||||
|
rowSets := bigtable.RowRangeList{}
|
||||||
|
for k, v := range cached {
|
||||||
|
start := fmt.Sprintf("%v:%v", k, v)
|
||||||
|
end := fmt.Sprintf("%v:%v", k, maxSeq)
|
||||||
|
rowSets = append(rowSets, bigtable.NewRange(start, end))
|
||||||
|
}
|
||||||
|
if len(rowSets) >= 1 {
|
||||||
|
rowSet = rowSets
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// cache is more than hour old, don't use it, reset it
|
||||||
|
warmCache = map[string]map[string]string{}
|
||||||
|
lastCacheReset = now
|
||||||
|
}
|
||||||
|
|
||||||
mostRecentByKeySegment := map[string]string{}
|
mostRecentByKeySegment := map[string]string{}
|
||||||
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
|
err := tbl.ReadRows(ctx, rowSet, func(row bigtable.Row) bool {
|
||||||
|
|
||||||
keyParts := strings.Split(row.Key(), ":")
|
keyParts := strings.Split(row.Key(), ":")
|
||||||
groupByKey := strings.Join(keyParts[:2], ":")
|
groupByKey := strings.Join(keyParts[:2], ":")
|
||||||
mostRecentByKeySegment[groupByKey] = row.Key()
|
mostRecentByKeySegment[groupByKey] = keyParts[2]
|
||||||
|
|
||||||
return true
|
return true
|
||||||
// TODO - add filter to only return rows created within the last 30(?) days
|
// TODO - add filter to only return rows created within the last 30(?) days
|
||||||
}, bigtable.RowFilter(bigtable.StripValueFilter()))
|
}, bigtable.RowFilter(
|
||||||
|
bigtable.ChainFilters(
|
||||||
|
bigtable.CellsPerRowLimitFilter(1),
|
||||||
|
bigtable.StripValueFilter(),
|
||||||
|
)))
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Fatalf("failed to read recent rows: %v", err)
|
log.Fatalf("failed to read recent rows: %v", err)
|
||||||
}
|
}
|
||||||
|
// update the cache with the latest rows
|
||||||
|
warmCache[cachePrefix] = mostRecentByKeySegment
|
||||||
return mostRecentByKeySegment
|
return mostRecentByKeySegment
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -44,10 +87,10 @@ func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string
|
||||||
// key/value pairs are the start/stop rowKeys for range queries
|
// key/value pairs are the start/stop rowKeys for range queries
|
||||||
rangePairs := map[string]string{}
|
rangePairs := map[string]string{}
|
||||||
|
|
||||||
for _, highestSequenceKey := range latest {
|
for prefixGroup, highestSequence := range latest {
|
||||||
rowKeyParts := strings.Split(highestSequenceKey, ":")
|
rowKeyParts := strings.Split(prefixGroup, ":")
|
||||||
// convert the sequence part of the rowkey from a string to an int, so it can be used for math
|
// convert the sequence part of the rowkey from a string to an int, so it can be used for math
|
||||||
highSequence, _ := strconv.Atoi(rowKeyParts[2])
|
highSequence, _ := strconv.Atoi(highestSequence)
|
||||||
lowSequence := highSequence - numRowsToFetch
|
lowSequence := highSequence - numRowsToFetch
|
||||||
// create a rowKey to use as the start of the range query
|
// create a rowKey to use as the start of the range query
|
||||||
rangeQueryStart := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], lowSequence)
|
rangeQueryStart := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], lowSequence)
|
||||||
|
|
Loading…
Reference in New Issue