BigTable: in-memory cache for /totals endpoint
Change-Id: I48199951b42ab8d57bb12fef6b6b1e0da9f3392f commit-id:13588560
This commit is contained in:
parent
92a9abfc74
commit
fb7e2c1cca
|
@ -25,6 +25,11 @@ type totalsResult struct {
|
||||||
DailyTotals map[string]map[string]int
|
DailyTotals map[string]map[string]int
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// warmCache keeps some data around between invocations, so that we don't have
|
||||||
|
// to do a full table scan with each request.
|
||||||
|
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
|
||||||
|
var warmTotalsCache = map[string]map[string]map[string]int{}
|
||||||
|
|
||||||
// derive the result index relevant to a row.
|
// derive the result index relevant to a row.
|
||||||
func makeGroupKey(keySegments int, rowKey string) string {
|
func makeGroupKey(keySegments int, rowKey string) string {
|
||||||
var countBy string
|
var countBy string
|
||||||
|
@ -68,6 +73,13 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
|
||||||
// there will be a query for each previous day, plus today
|
// there will be a query for each previous day, plus today
|
||||||
intervalsWG.Add(numPrevDays + 1)
|
intervalsWG.Add(numPrevDays + 1)
|
||||||
|
|
||||||
|
// create the unique identifier for this query, for cache
|
||||||
|
cachePrefix := prefix
|
||||||
|
if prefix == "" {
|
||||||
|
cachePrefix = "*"
|
||||||
|
}
|
||||||
|
cachePrefix = fmt.Sprintf("%v-%v", cachePrefix, keySegments)
|
||||||
|
|
||||||
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
|
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
|
||||||
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
|
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
|
||||||
// start is the SOD, end is EOD
|
// start is the SOD, end is EOD
|
||||||
|
@ -86,6 +98,35 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
|
||||||
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
|
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
|
||||||
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
|
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
|
||||||
|
|
||||||
|
dateStr := start.Format("2006-01-02")
|
||||||
|
|
||||||
|
mu.Lock()
|
||||||
|
// initialize the map for this date in the result set
|
||||||
|
results[dateStr] = map[string]int{"*": 0}
|
||||||
|
// check to see if there is cache data for this date/query
|
||||||
|
if dateCache, ok := warmTotalsCache[dateStr]; ok {
|
||||||
|
// have a cache for this date
|
||||||
|
|
||||||
|
if val, ok := dateCache[cachePrefix]; ok {
|
||||||
|
// have a cache for this query
|
||||||
|
if daysAgo >= 1 {
|
||||||
|
// only use the cache for yesterday and older
|
||||||
|
results[dateStr] = val
|
||||||
|
mu.Unlock()
|
||||||
|
intervalsWG.Done()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// no cache for this query
|
||||||
|
warmTotalsCache[dateStr][cachePrefix] = map[string]int{}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
// no cache for this date, initialize the map
|
||||||
|
warmTotalsCache[dateStr] = map[string]map[string]int{}
|
||||||
|
warmTotalsCache[dateStr][cachePrefix] = map[string]int{}
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
|
||||||
var result []bigtable.Row
|
var result []bigtable.Row
|
||||||
var fetchErr error
|
var fetchErr error
|
||||||
|
|
||||||
|
@ -96,12 +137,6 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
|
||||||
log.Fatalf("fetchRowsInInterval returned an error: %v", fetchErr)
|
log.Fatalf("fetchRowsInInterval returned an error: %v", fetchErr)
|
||||||
}
|
}
|
||||||
|
|
||||||
dateStr := start.Format("2006-01-02")
|
|
||||||
mu.Lock()
|
|
||||||
// initialize the map for this date in the result set
|
|
||||||
if results[dateStr] == nil {
|
|
||||||
results[dateStr] = map[string]int{"*": 0}
|
|
||||||
}
|
|
||||||
// iterate through the rows and increment the count
|
// iterate through the rows and increment the count
|
||||||
for _, row := range result {
|
for _, row := range result {
|
||||||
countBy := makeGroupKey(keySegments, row.Key())
|
countBy := makeGroupKey(keySegments, row.Key())
|
||||||
|
@ -114,10 +149,14 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
|
||||||
// add this key to the set
|
// add this key to the set
|
||||||
seenKeySet[countBy] = true
|
seenKeySet[countBy] = true
|
||||||
}
|
}
|
||||||
mu.Unlock()
|
|
||||||
|
// set the result in the cache
|
||||||
|
warmTotalsCache[dateStr][cachePrefix] = results[dateStr]
|
||||||
}(tbl, ctx, prefix, daysAgo)
|
}(tbl, ctx, prefix, daysAgo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
intervalsWG.Wait()
|
||||||
|
|
||||||
// ensure each date object has the same keys:
|
// ensure each date object has the same keys:
|
||||||
for _, v := range results {
|
for _, v := range results {
|
||||||
for key := range seenKeySet {
|
for key := range seenKeySet {
|
||||||
|
@ -127,7 +166,6 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
intervalsWG.Wait()
|
|
||||||
|
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue