async queries for /totals function
Change-Id: If9f1267fc713e547657fbd60e7390ed59f5b9d03
This commit is contained in:
parent
90f7015b93
commit
82b4ed5843
|
@ -57,55 +57,65 @@ func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string
|
||||||
}
|
}
|
||||||
|
|
||||||
func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
|
func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
|
||||||
|
var mu sync.RWMutex
|
||||||
results := map[string]map[string]int{}
|
results := map[string]map[string]int{}
|
||||||
// key track of all the keys seen, to ensure the result objects all have the same keys
|
// key track of all the keys seen, to ensure the result objects all have the same keys
|
||||||
seenKeySet := map[string]bool{}
|
seenKeySet := map[string]bool{}
|
||||||
|
|
||||||
now := time.Now()
|
now := time.Now()
|
||||||
|
|
||||||
|
var intervalsWG sync.WaitGroup
|
||||||
|
// there will be a query for each previous day, plus today
|
||||||
|
intervalsWG.Add(numPrevDays + 1)
|
||||||
|
|
||||||
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
|
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
|
||||||
|
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
|
||||||
|
// start is the SOD, end is EOD
|
||||||
|
// "0 daysAgo start" is 00:00:00 AM of the current day
|
||||||
|
// "0 daysAgo end" is 23:59:59 of the current day (the future)
|
||||||
|
|
||||||
// start is the SOD, end is EOD
|
// calulate the start and end times for the query
|
||||||
// "0 daysAgo start" is 00:00:00 AM of the current day
|
hoursAgo := (24 * daysAgo)
|
||||||
// "0 daysAgo end" is 23:59:59 of the current day (the future)
|
daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
|
||||||
|
n := now.Add(daysAgoDuration)
|
||||||
|
year := n.Year()
|
||||||
|
month := n.Month()
|
||||||
|
day := n.Day()
|
||||||
|
loc := n.Location()
|
||||||
|
|
||||||
// calulate the start and end times for the query
|
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
|
||||||
hoursAgo := (24 * daysAgo)
|
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
|
||||||
daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
|
|
||||||
n := now.Add(daysAgoDuration)
|
|
||||||
year := n.Year()
|
|
||||||
month := n.Month()
|
|
||||||
day := n.Day()
|
|
||||||
loc := n.Location()
|
|
||||||
|
|
||||||
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
|
var result []bigtable.Row
|
||||||
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
|
var fetchErr error
|
||||||
|
|
||||||
result, fetchErr := fetchRowsInInterval(tbl, ctx, prefix, start, end)
|
defer intervalsWG.Done()
|
||||||
if fetchErr != nil {
|
result, fetchErr = fetchRowsInInterval(tbl, ctx, prefix, start, end)
|
||||||
log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
|
|
||||||
return nil, fetchErr
|
|
||||||
}
|
|
||||||
|
|
||||||
dateStr := start.Format("2006-01-02")
|
if fetchErr != nil {
|
||||||
|
log.Fatalf("fetchRowsInInterval returned an error: %v", fetchErr)
|
||||||
// initialize the map for this date in the result set
|
|
||||||
if results[dateStr] == nil {
|
|
||||||
results[dateStr] = map[string]int{"*": 0}
|
|
||||||
}
|
|
||||||
// iterate through the rows and increment the count
|
|
||||||
for _, row := range result {
|
|
||||||
countBy := makeGroupKey(keySegments, row.Key())
|
|
||||||
if keySegments != 0 {
|
|
||||||
// increment the total count
|
|
||||||
results[dateStr]["*"] = results[dateStr]["*"] + 1
|
|
||||||
}
|
}
|
||||||
results[dateStr][countBy] = results[dateStr][countBy] + 1
|
|
||||||
|
|
||||||
// add this key to the set
|
dateStr := start.Format("2006-01-02")
|
||||||
seenKeySet[countBy] = true
|
mu.Lock()
|
||||||
}
|
// initialize the map for this date in the result set
|
||||||
|
if results[dateStr] == nil {
|
||||||
|
results[dateStr] = map[string]int{"*": 0}
|
||||||
|
}
|
||||||
|
// iterate through the rows and increment the count
|
||||||
|
for _, row := range result {
|
||||||
|
countBy := makeGroupKey(keySegments, row.Key())
|
||||||
|
if keySegments != 0 {
|
||||||
|
// increment the total count
|
||||||
|
results[dateStr]["*"] = results[dateStr]["*"] + 1
|
||||||
|
}
|
||||||
|
results[dateStr][countBy] = results[dateStr][countBy] + 1
|
||||||
|
|
||||||
|
// add this key to the set
|
||||||
|
seenKeySet[countBy] = true
|
||||||
|
}
|
||||||
|
mu.Unlock()
|
||||||
|
}(tbl, ctx, prefix, daysAgo)
|
||||||
}
|
}
|
||||||
|
|
||||||
// ensure each date object has the same keys:
|
// ensure each date object has the same keys:
|
||||||
|
@ -117,6 +127,7 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
intervalsWG.Wait()
|
||||||
|
|
||||||
return results, nil
|
return results, nil
|
||||||
}
|
}
|
||||||
|
|
Loading…
Reference in New Issue