Explorer functions - use GSC for cache documents

commit-id:08e1c0c3
This commit is contained in:
justinschuldt 2022-02-27 10:08:31 -06:00 committed by Justin Schuldt
parent 69390882af
commit a89350745e
15 changed files with 332 additions and 242 deletions

View File

@ -4,10 +4,13 @@
.vscode
.env
cmd
*.md
*.txt
*.json
Dockerfile

View File

@ -27,14 +27,18 @@ type cumulativeAddressesResult struct {
// an in-memory cache of previously calculated results
var warmCumulativeAddressesCache = map[string]map[string]map[string]map[string]float64{}
var muWarmCumulativeAddressesCache sync.RWMutex
var warmCumulativeAddressesCacheFilePath = "/addresses-transferred-to-cumulative-cache.json"
var warmCumulativeAddressesCacheFilePath = "addresses-transferred-to-cumulative-cache.json"
var addressesToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
var muAddressesToUpToYesterday sync.RWMutex
var addressesToUpToYesterdayFilePath = "/addresses-transferred-to-up-to-yesterday-cache.json"
var addressesToUpToYesterdayFilePath = "addresses-transferred-to-up-to-yesterday-cache.json"
// finds all the unique addresses that have received tokens since a particular moment.
func addressesTransferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
if _, ok := addressesToUpToYesterday["*"]; !ok {
loadJsonToInterface(ctx, addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, &addressesToUpToYesterday)
}
now := time.Now().UTC()
today := now.Format("2006-01-02")
oneDayAgo := -time.Duration(24) * time.Hour
@ -90,7 +94,7 @@ func addressesTransferredToSince(tbl *bigtable.Table, ctx context.Context, prefi
addressesToUpToYesterday[cachePrefix][yesterday] = upToYesterday
muAddressesToUpToYesterday.Unlock()
// write cache to disc
persistInterfaceToJson(addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, addressesToUpToYesterday)
persistInterfaceToJson(ctx, addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, addressesToUpToYesterday)
} else {
muAddressesToUpToYesterday.Unlock()
}
@ -100,6 +104,10 @@ func addressesTransferredToSince(tbl *bigtable.Table, ctx context.Context, prefi
// calcuates a map of recepient address to notional value received, by chain, since the start time specified.
func createCumulativeAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
if _, ok := warmCumulativeAddressesCache["*"]; !ok {
loadJsonToInterface(ctx, warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, &warmCumulativeAddressesCache)
}
now := time.Now().UTC()
today := now.Format("2006-01-02")
@ -125,7 +133,7 @@ func createCumulativeAddressesOfInterval(tbl *bigtable.Table, ctx context.Contex
// of each token transfer by symbol, based on the destination of the transfer.
for i, date := range dateKeys {
muWarmCumulativeAddressesCache.RLock()
if dateCache, ok := warmCumulativeAddressesCache[cachePrefix][date]; ok && dateCache != nil {
if dateCache, ok := warmCumulativeAddressesCache[cachePrefix][date]; ok && dateCache != nil && useCache(date) {
// have a cached value for this day, use it.
results[date] = dateCache
muWarmCumulativeAddressesCache.RUnlock()
@ -175,7 +183,7 @@ func createCumulativeAddressesOfInterval(tbl *bigtable.Table, ctx context.Contex
if date != today {
// set the result in the cache
muWarmCumulativeAddressesCache.Lock()
if _, ok := warmCumulativeAddressesCache[cachePrefix][date]; !ok {
if _, ok := warmCumulativeAddressesCache[cachePrefix][date]; !ok || !useCache(date) {
// cache does not have this date, persist it for other instances.
warmCumulativeAddressesCache[cachePrefix][date] = results[date]
cacheNeedsUpdate = true
@ -186,7 +194,7 @@ func createCumulativeAddressesOfInterval(tbl *bigtable.Table, ctx context.Contex
}
if cacheNeedsUpdate {
persistInterfaceToJson(warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, warmCumulativeAddressesCache)
persistInterfaceToJson(ctx, warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, warmCumulativeAddressesCache)
}
selectDays := map[string]map[string]map[string]float64{}

View File

@ -30,7 +30,7 @@ type addressesResult struct {
// an in-memory cache of previously calculated results
var warmAddressesCache = map[string]map[string]map[string]map[string]float64{}
var muWarmAddressesCache sync.RWMutex
var warmAddressesCacheFilePath = "/addresses-transferred-to-cache.json"
var warmAddressesCacheFilePath = "addresses-transferred-to-cache.json"
type AddressData struct {
TokenSymbol string
@ -110,6 +110,10 @@ func fetchAddressRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix
// finds unique addresses tokens have been sent to, for each day since the start time passed in.
func createAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
if _, ok := warmAddressesCache["*"]; !ok {
loadJsonToInterface(ctx, warmAddressesCacheFilePath, &muWarmAddressesCache, &warmAddressesCache)
}
results := map[string]map[string]map[string]float64{}
now := time.Now().UTC()
@ -151,7 +155,7 @@ func createAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
if dates, ok := warmAddressesCache[cachePrefix]; ok {
// have a cache for this query
if dateCache, ok := dates[dateStr]; ok {
if dateCache, ok := dates[dateStr]; ok && useCache(dateStr) {
// have a cache for this date
if daysAgo >= 1 {
// only use the cache for yesterday and older
@ -183,7 +187,7 @@ func createAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
if daysAgo >= 1 {
// set the result in the cache
muWarmAddressesCache.Lock()
if _, ok := warmAddressesCache[cachePrefix][dateStr]; !ok {
if _, ok := warmAddressesCache[cachePrefix][dateStr]; !ok || !useCache(dateStr) {
// cache does not have this date, persist it for other instances.
warmAddressesCache[cachePrefix][dateStr] = results[dateStr]
cacheNeedsUpdate = true
@ -196,7 +200,7 @@ func createAddressesOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(warmAddressesCacheFilePath, &muWarmAddressesCache, warmAddressesCache)
persistInterfaceToJson(ctx, warmAddressesCacheFilePath, &muWarmAddressesCache, warmAddressesCache)
}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same keys

View File

@ -88,6 +88,16 @@ func chainIdToCoinGeckoPlatform(chain vaa.ChainID) string {
return "binance-smart-chain"
case vaa.ChainIDPolygon:
return "polygon-pos"
case vaa.ChainIDAvalanche:
return "avalanche"
case vaa.ChainIDOasis:
return "oasis"
case vaa.ChainIDAlgorand:
return "algorand"
case vaa.ChainIDFantom:
return "fantom"
case vaa.ChainIDEthereumRopsten:
return "ethereum"
}
return ""
}

View File

@ -7,8 +7,8 @@ go 1.16
require (
cloud.google.com/go/bigtable v1.12.0
cloud.google.com/go/pubsub v1.17.1
github.com/GoogleCloudPlatform/functions-framework-go v1.5.2 // indirect
github.com/certusone/wormhole/node v0.0.0-20211115153408-0a93202f6e5d
cloud.google.com/go/storage v1.18.2
github.com/certusone/wormhole/node v0.0.0-20220225194731-7e461f489cbc
github.com/cosmos/cosmos-sdk v0.44.0
github.com/gagliardetto/solana-go v1.0.2
github.com/holiman/uint256 v1.2.0

View File

@ -44,10 +44,9 @@ cloud.google.com/go/bigtable v1.12.0/go.mod h1:W96Adxrf90LlA4fuB+UjFu/Y8OpoaK7y2
cloud.google.com/go/datastore v1.0.0/go.mod h1:LXYbyblFSglQ5pkeyhO+Qmw7ukd3C+pD7TKLgZqpHYE=
cloud.google.com/go/datastore v1.1.0/go.mod h1:umbIZjpQpHh4hmRpGhH4tLFup+FVzqBi1b3c64qFpCk=
cloud.google.com/go/firestore v1.1.0/go.mod h1:ulACoGHTpvq5r8rxGJ4ddJZBZqakUQqClKRT5SZwBmk=
cloud.google.com/go/functions v1.0.0 h1:cOFEDJ3sgAFRjRULSUJ0Q8cw9qFa5JdpXIBWoNX5uDw=
cloud.google.com/go/functions v1.0.0/go.mod h1:O9KS8UweFVo6GbbbCBKh5yEzbW08PVkg2spe3RfPMd4=
cloud.google.com/go/kms v1.0.0 h1:YkIeqPXqTAlwXk3Z2/WG0d6h1tqJQjU354WftjEoP9E=
cloud.google.com/go/kms v1.0.0/go.mod h1:nhUehi+w7zht2XrUfvTRNpxrfayBHqP4lu2NSywui/0=
cloud.google.com/go/logging v1.4.2/go.mod h1:jco9QZSx8HiVVqLJReq7z7bVdj0P1Jb9PDFs63T+axo=
cloud.google.com/go/pubsub v1.0.1/go.mod h1:R0Gpsv3s54REJCy4fxDixWD93lHJMoZTyQ2kNxGRt3I=
cloud.google.com/go/pubsub v1.1.0/go.mod h1:EwwdRX2sKPjnvnqCa270oGRyludottCI76h+R3AArQw=
cloud.google.com/go/pubsub v1.2.0/go.mod h1:jhfEVHT8odbXTkndysNHCcx0awwzvfOlguIAii9o8iA=
@ -59,6 +58,8 @@ cloud.google.com/go/storage v1.5.0/go.mod h1:tpKbwo567HUNpVclU5sGELwQWBDZ8gh0Zeo
cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohlUTyfDhBk=
cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs=
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
cloud.google.com/go/storage v1.18.2 h1:5NQw6tOn3eMm0oE8vTkfjau18kjL79FlMjy/CHTpmoY=
cloud.google.com/go/storage v1.18.2/go.mod h1:AiIj7BWXyhO5gGVmYJ+S8tbkCx3yb0IMjua8Aw4naVM=
collectd.org v0.3.0/go.mod h1:A/8DzQBkF6abtvrT2j/AU/4tiBgJWYyh0y/oB/4MlWE=
contrib.go.opencensus.io/exporter/stackdriver v0.12.6/go.mod h1:8x999/OcIPy5ivx/wDiV7Gx4D+VUPODf0mWRGRc5kSk=
contrib.go.opencensus.io/exporter/stackdriver v0.13.4 h1:ksUxwH3OD5sxkjzEqGxNTl+Xjsmu3BnC/300MhSVTSc=
@ -100,8 +101,6 @@ github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t
github.com/DataDog/zstd v1.4.5/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/GeertJohan/go.incremental v1.0.0/go.mod h1:6fAjUhbVuX1KcMD3c8TEgVUqmo4seqhv0i0kdATSkM0=
github.com/GeertJohan/go.rice v1.0.0/go.mod h1:eH6gbSOAUv07dQuZVnBmoDP8mgsM1rtixis4Tib9if0=
github.com/GoogleCloudPlatform/functions-framework-go v1.5.2 h1:fPYZMZ8BSK2jfZ28VG6vYxr/PTLbG+9USn8njzxfmWM=
github.com/GoogleCloudPlatform/functions-framework-go v1.5.2/go.mod h1:pq+lZy4vONJ5fjd3q/B6QzWhfHPAbuVweLpxZzMOb9Y=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/Kubuxu/go-os-helper v0.0.1/go.mod h1:N8B+I7vPCT80IcP58r50u4+gEEcsZETFUpAzWW2ep1Y=
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
@ -199,8 +198,8 @@ github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
github.com/cenkalti/backoff/v4 v4.1.1/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/certusone/wormhole/node v0.0.0-20211115153408-0a93202f6e5d h1:ToxXpT4bPkE4waJY4c2ta6vqHIfVhiONyzRouU0gB1g=
github.com/certusone/wormhole/node v0.0.0-20211115153408-0a93202f6e5d/go.mod h1:YncgdSOYam7ELyXFo7PFCj6tUo0pe6cjlj+O3Vt28mo=
github.com/certusone/wormhole/node v0.0.0-20220225194731-7e461f489cbc h1:6WL34LK795x0qei17O5QEvKONTt5VJT3FCGRHHsl81M=
github.com/certusone/wormhole/node v0.0.0-20220225194731-7e461f489cbc/go.mod h1:TAPmwJ1XPdSxZHWBdZjJHos0JmQM6WMCNWbT167fs1s=
github.com/cespare/cp v0.1.0/go.mod h1:SOGHArjBr4JWaSDEVpWpo/hNg6RoKrls6Oh40hiwW+s=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
@ -212,8 +211,6 @@ github.com/circonus-labs/circonus-gometrics v2.3.1+incompatible/go.mod h1:nmEj6D
github.com/circonus-labs/circonusllhist v0.1.3/go.mod h1:kMXHVDlOchFAehlya5ePtbp5jckzBHf4XRpQvBOLI+I=
github.com/clbanning/x2j v0.0.0-20191024224557-825249438eec/go.mod h1:jMjuTZXRI4dUb/I5gc9Hdhagfvm9+RyrPryS/auMzxE=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
github.com/cloudevents/sdk-go/v2 v2.6.1 h1:yHtzgmeBvc0TZx1nrnvYXov1CSvkQyvhEhNMs8Z5Mmk=
github.com/cloudevents/sdk-go/v2 v2.6.1/go.mod h1:nlXhgFkf0uTopxmRXalyMwS2LG70cRGPrxzmjJgSG0U=
github.com/cloudflare/cloudflare-go v0.10.2-0.20190916151808-a80f83b9add9/go.mod h1:1MxXX1Ux4x6mqPmjkUgTP1CdXIBXKX7T+Jk9Gxrmx+U=
github.com/cloudflare/cloudflare-go v0.14.0/go.mod h1:EnwdgGMaFOruiPZRFSgn+TsQ3hQ7C/YWzIGLeu5c304=
github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc=
@ -256,7 +253,6 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E=
github.com/daaku/go.zipexe v1.0.0/go.mod h1:z8IiR6TsVLEYKwXAoE/I+8ys/sDkgTzSL0CLnGVd57E=
github.com/danieljoos/wincred v1.0.2/go.mod h1:SnuYRW9lp1oJrZX/dXJqr0cPK5gYXqx3EJbmjhLdK9U=
github.com/dave/jennifer v1.2.0/go.mod h1:fIb+770HOpJ2fmN9EPPKOqm1vMGhB+TwXKMZhrIygKg=
@ -456,9 +452,11 @@ github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/
github.com/google/gofuzz v1.1.1-0.20200604201612-c04b05f3adfa/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/google/gopacket v1.1.17/go.mod h1:UdDNZ1OO62aGYVnPhxT1U6aI7ukYtA/kB8vaU0diBUM=
github.com/google/gopacket v1.1.19/go.mod h1:iJ8V8n6KS+z2U1A8pUwu8bW5SyEMkXJB8Yo/Vo+TKTo=
github.com/google/martian v2.1.0+incompatible h1:/CP5g8u/VJHijgedC/Legn3BAbAaWPgecwXBIDzw5no=
github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs=
github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0=
github.com/google/martian/v3 v3.2.1 h1:d8MncMlErDFTwQGBK1xhv026j9kqhvw1Qv9IbWT1VLQ=
github.com/google/martian/v3 v3.2.1/go.mod h1:oBOf6HBosgwRXnUGWUB05QECsc6uvmMiJ3+6W4l/CUk=
github.com/google/orderedcode v0.0.1/go.mod h1:iVyU4/qPKHY5h/wSd6rZZCDcLJNxiWO6dvsYES2Sb20=
github.com/google/pprof v0.0.0-20181206194817-3ea8567a2e57/go.mod h1:zfwlbNMJ+OItoe0UupaVj+oy1omPYYDuagoSzA8v9mc=
@ -481,7 +479,6 @@ github.com/google/uuid v1.0.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.5/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.2.0 h1:qJYtXnJRWmpe7m/3XlyhrsLrEURqHRM2kxzoxXqyUDs=
github.com/google/uuid v1.2.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/googleapis/gax-go v2.0.0+incompatible h1:j0GKcs05QVmm7yesiZq2+9cxHkNK9YM6zKx4D2qucQU=
github.com/googleapis/gax-go v2.0.0+incompatible/go.mod h1:SFVmujtThgffbyetf+mdk2eWhX2bMyUtNHzFKcPA9HY=
@ -692,7 +689,6 @@ github.com/kr/pretty v0.2.1/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfn
github.com/kr/pty v1.1.1/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/pty v1.1.3/go.mod h1:pFQYn66WHrOpPYNljwOMqo10TkYh1fy3cYio2l3bCsQ=
github.com/kr/text v0.1.0/go.mod h1:4Jbv+DJW3UT/LiOwJeYQe1efqtUx/iVham/4vfdArNI=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw=
github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8=
github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgxpxOKII=
@ -1891,6 +1887,7 @@ google.golang.org/genproto v0.0.0-20210917145530-b395a37504d4/go.mod h1:eFjDcFEc
google.golang.org/genproto v0.0.0-20210921142501-181ce0d877f6/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20210924002016-3dee208752a0/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211008145708-270636b82663/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211016002631-37fc39342514/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211019152133-63b7e35f4404/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=
google.golang.org/genproto v0.0.0-20211027162914-98a5263abeca h1:+e+aQDO4/c9KaG8PXWHTc6/+Du6kz+BKcXCSnV4SSTE=
google.golang.org/genproto v0.0.0-20211027162914-98a5263abeca/go.mod h1:5CzLGKJ67TSI2B9POpiiyGha0AjJvZIUgRMt1dSmuhc=

View File

@ -20,6 +20,8 @@ import (
// to do a full table scan with each request.
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
var warmNFTCache = map[string]map[string]map[string]int{}
var muWarmNFTCache sync.RWMutex
var warmNFTCacheFilePath = "nft-cache.json"
func fetchNFTRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]bigtable.Row, error) {
rows := []bigtable.Row{}
@ -49,7 +51,10 @@ func fetchNFTRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix str
}
func createNFTCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
var mu sync.RWMutex
if _, ok := warmNFTCache["2021-09-13"]; !ok {
loadJsonToInterface(ctx, warmNFTCacheFilePath, &muWarmNFTCache, &warmNFTCache)
}
results := map[string]map[string]int{}
now := time.Now().UTC()
@ -64,6 +69,7 @@ func createNFTCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
cachePrefix = "*"
}
cachePrefix = fmt.Sprintf("%v-%v", cachePrefix, keySegments)
cacheNeedsUpdate := false
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
@ -85,11 +91,11 @@ func createNFTCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
dateStr := start.Format("2006-01-02")
mu.Lock()
muWarmNFTCache.Lock()
// initialize the map for this date in the result set
results[dateStr] = map[string]int{"*": 0}
// check to see if there is cache data for this date/query
if dateCache, ok := warmNFTCache[dateStr]; ok {
if dateCache, ok := warmNFTCache[dateStr]; ok && useCache(dateStr) {
// have a cache for this date
if val, ok := dateCache[cachePrefix]; ok {
@ -97,7 +103,7 @@ func createNFTCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
if daysAgo >= 1 {
// only use the cache for yesterday and older
results[dateStr] = val
mu.Unlock()
muWarmNFTCache.Unlock()
intervalsWG.Done()
return
}
@ -110,7 +116,7 @@ func createNFTCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
warmNFTCache[dateStr] = map[string]map[string]int{}
warmNFTCache[dateStr][cachePrefix] = map[string]int{}
}
mu.Unlock()
muWarmNFTCache.Unlock()
var result []bigtable.Row
var fetchErr error
@ -132,13 +138,22 @@ func createNFTCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
results[dateStr][countBy] = results[dateStr][countBy] + 1
}
// set the result in the cache
warmNFTCache[dateStr][cachePrefix] = results[dateStr]
if cacheData, ok := warmNFTCache[dateStr][cachePrefix]; !ok || len(cacheData) <= 1 {
// set the result in the cache
muWarmNFTCache.Lock()
warmNFTCache[dateStr][cachePrefix] = results[dateStr]
muWarmNFTCache.Unlock()
cacheNeedsUpdate = true
}
}(tbl, ctx, prefix, daysAgo)
}
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(ctx, warmNFTCacheFilePath, &muWarmNFTCache, warmNFTCache)
}
// create a set of all the keys from all dates, to ensure the result objects all have the same keys
seenKeySet := map[string]bool{}
for _, v := range results {
@ -196,12 +211,13 @@ func NFTs(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numDays, groupBy, forChain, forAddress string
var last24Hours, numDays, groupBy, forChain, forAddress string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
last24Hours = queryParams.Get("last24Hours")
numDays = queryParams.Get("numDays")
groupBy = queryParams.Get("groupBy")
forChain = queryParams.Get("forChain")
@ -218,10 +234,11 @@ func NFTs(w http.ResponseWriter, r *http.Request) {
case http.MethodPost:
// declare request body properties
var d struct {
NumDays string `json:"numDays"`
GroupBy string `json:"groupBy"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
Last24Hours string `json:"last24Hours"`
NumDays string `json:"numDays"`
GroupBy string `json:"groupBy"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
}
// deserialize request body
@ -236,6 +253,7 @@ func NFTs(w http.ResponseWriter, r *http.Request) {
}
}
last24Hours = d.Last24Hours
numDays = d.NumDays
groupBy = d.GroupBy
forChain = d.ForChain
@ -247,10 +265,11 @@ func NFTs(w http.ResponseWriter, r *http.Request) {
return
}
var queryDays int
if numDays == "" {
queryDays = 30
} else {
// default query period is all time
queryDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
// if the request included numDays, set the query period to that
if numDays != "" {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
@ -291,39 +310,22 @@ func NFTs(w http.ResponseWriter, r *http.Request) {
// total of last 24 hours
var last24HourCount map[string]int
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourCount, err = nftMessageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
if err != nil {
log.Printf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
if last24Hours != "" {
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourCount, err = nftMessageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
if err != nil {
log.Printf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
}
// total of the last 30 days
var periodCount map[string]int
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
periodCount, err = nftMessageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
if err != nil {
log.Fatalf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
// daily totals
periodTotals := map[string]int{}
var dailyTotals map[string]map[string]int
wg.Add(1)
go func(prefix string, keySegments int, queryDays int) {
@ -331,16 +333,23 @@ func NFTs(w http.ResponseWriter, r *http.Request) {
defer wg.Done()
dailyTotals, err = createNFTCountsOfInterval(tbl, ctx, prefix, queryDays, keySegments)
if err != nil {
log.Fatalf("failed getting createCountsOfInterval err %v", err)
log.Fatalf("failed getting createNFTCountsOfInterval err %v", err)
}
// sum all the days to create a map with totals for the query period
for _, vals := range dailyTotals {
for chain, amount := range vals {
periodTotals[chain] += amount
}
}
}(prefix, keySegments, queryDays)
wg.Wait()
result := &totalsResult{
LastDayCount: last24HourCount,
TotalCount: periodCount,
DailyTotals: dailyTotals,
LastDayCount: last24HourCount,
TotalCount: periodTotals,
TotalCountDurationDays: queryDays,
DailyTotals: dailyTotals,
}
jsonBytes, err := json.Marshal(result)

View File

@ -27,14 +27,18 @@ type cumulativeResult struct {
// an in-memory cache of previously calculated results
var warmCumulativeCache = map[string]map[string]map[string]map[string]float64{}
var muWarmCumulativeCache sync.RWMutex
var warmCumulativeCacheFilePath = "/notional-transferred-to-cumulative-cache.json"
var warmCumulativeCacheFilePath = "notional-transferred-to-cumulative-cache.json"
var transferredToUpToYesterday = map[string]map[string]map[string]map[string]float64{}
var muTransferredToUpToYesterday sync.RWMutex
var transferredToUpToYesterdayFilePath = "/notional-transferred-to-up-to-yesterday-cache.json"
var transferredToUpToYesterdayFilePath = "notional-transferred-to-up-to-yesterday-cache.json"
// calculates the amount of each symbol transfered to each chain.
func transferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]float64 {
if _, ok := transferredToUpToYesterday["*"]; !ok {
loadJsonToInterface(ctx, transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, &transferredToUpToYesterday)
}
now := time.Now().UTC()
today := now.Format("2006-01-02")
oneDayAgo := -time.Duration(24) * time.Hour
@ -89,7 +93,7 @@ func transferredToSince(tbl *bigtable.Table, ctx context.Context, prefix string,
transferredToUpToYesterday[cachePrefix][yesterday] = upToYesterday
muTransferredToUpToYesterday.Unlock()
// write the updated cache to disc
persistInterfaceToJson(transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, transferredToUpToYesterday)
persistInterfaceToJson(ctx, transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, transferredToUpToYesterday)
} else {
muTransferredToUpToYesterday.Unlock()
}
@ -120,6 +124,10 @@ func getDaysInRange(start, end time.Time) []string {
// calcuates a running total of notional value transferred, by symbol, since the start time specified.
func createCumulativeAmountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
if _, ok := warmCumulativeCache["*"]; !ok {
loadJsonToInterface(ctx, warmCumulativeCacheFilePath, &muWarmCumulativeCache, &warmCumulativeCache)
}
now := time.Now().UTC()
today := now.Format("2006-01-02")
@ -147,7 +155,7 @@ func createCumulativeAmountsOfInterval(tbl *bigtable.Table, ctx context.Context,
// of each token transfer by symbol, based on the destination of the transfer.
for i, date := range dateKeys {
muWarmCumulativeCache.RLock()
if dateCache, ok := warmCumulativeCache[cachePrefix][date]; ok && dateCache != nil {
if dateCache, ok := warmCumulativeCache[cachePrefix][date]; ok && dateCache != nil && useCache(date) {
// have a cached value for this day, use it.
results[date] = dateCache
muWarmCumulativeCache.RUnlock()
@ -211,7 +219,7 @@ func createCumulativeAmountsOfInterval(tbl *bigtable.Table, ctx context.Context,
}
if cacheNeedsUpdate {
persistInterfaceToJson(warmCumulativeCacheFilePath, &muWarmCumulativeCache, warmCumulativeCache)
persistInterfaceToJson(ctx, warmCumulativeCacheFilePath, &muWarmCumulativeCache, warmCumulativeCache)
}
// take the most recent n days, rather than returning all days since launch

View File

@ -27,7 +27,7 @@ type amountsResult struct {
// an in-memory cache of previously calculated results
var warmTransfersToCache = map[string]map[string]map[string]map[string]float64{}
var muWarmTransfersToCache sync.RWMutex
var warmTransfersToCacheFilePath = "/notional-transferred-to-cache.json"
var warmTransfersToCacheFilePath = "notional-transferred-to-cache.json"
type TransferData struct {
TokenSymbol string
@ -121,6 +121,10 @@ func fetchTransferRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefi
// finds the daily amount of each symbol transferred to each chain, from the specified start to the present.
func amountsTransferredToInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]float64 {
if _, ok := warmTransfersToCache["*"]; !ok {
loadJsonToInterface(ctx, warmTransfersToCacheFilePath, &muWarmTransfersToCache, &warmTransfersToCache)
}
results := map[string]map[string]map[string]float64{}
now := time.Now().UTC()
@ -212,7 +216,7 @@ func amountsTransferredToInInterval(tbl *bigtable.Table, ctx context.Context, pr
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(warmTransfersToCacheFilePath, &muWarmTransfersToCache, warmTransfersToCache)
persistInterfaceToJson(ctx, warmTransfersToCacheFilePath, &muWarmTransfersToCache, warmTransfersToCache)
}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same chain keys

View File

@ -25,11 +25,15 @@ type transfersResult struct {
// an in-memory cache of previously calculated results
var warmTransfersCache = map[string]map[string]map[string]map[string]map[string]float64{}
var muWarmTransfersCache sync.RWMutex
var warmTransfersCacheFilePath = "/notional-transferred-cache.json"
var warmTransfersCacheFilePath = "notional-transferred-cache.json"
// finds the daily amount of each symbol transferred from each chain, to each chain,
// from the specified start to the present.
func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start time.Time) map[string]map[string]map[string]map[string]float64 {
if _, ok := warmTransfersCache["*"]; !ok {
loadJsonToInterface(ctx, warmTransfersCacheFilePath, &muWarmTransfersCache, &warmTransfersCache)
}
results := map[string]map[string]map[string]map[string]float64{}
now := time.Now().UTC()
@ -71,7 +75,7 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
if dates, ok := warmTransfersCache[cachePrefix]; ok {
// have a cache for this query
if dateCache, ok := dates[dateStr]; ok && len(dateCache) > 1 {
if dateCache, ok := dates[dateStr]; ok && len(dateCache) > 1 && useCache(dateStr) {
// have a cache for this date
if daysAgo >= 1 {
@ -127,7 +131,7 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
if daysAgo >= 1 {
// set the result in the cache
muWarmTransfersCache.Lock()
if cacheData, ok := warmTransfersCache[cachePrefix][dateStr]; !ok || len(cacheData) == 1 {
if cacheData, ok := warmTransfersCache[cachePrefix][dateStr]; !ok || len(cacheData) == 1 || !useCache(dateStr) {
// cache does not have this date, add the data, and mark the cache stale
warmTransfersCache[cachePrefix][dateStr] = results[dateStr]
cacheNeedsUpdate = true
@ -140,7 +144,7 @@ func createTransfersOfInterval(tbl *bigtable.Table, ctx context.Context, prefix
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(warmTransfersCacheFilePath, &muWarmTransfersCache, warmTransfersCache)
persistInterfaceToJson(ctx, warmTransfersCacheFilePath, &muWarmTransfersCache, warmTransfersCache)
}
// having consistent keys in each object is helpful for clients, explorer GUI

View File

@ -51,7 +51,7 @@ var skipDays = map[string]bool{
// calcuates a running total of notional value transferred, by symbol, since the start time specified.
func createTvlCumulativeOfInterval(tbl *bigtable.Table, ctx context.Context, start time.Time) map[string]map[string]map[string]LockedAsset {
if len(warmTvlCumulativeCache) == 0 {
loadJsonToInterface(warmTvlCumulativeCacheFilePath, &muWarmTvlCumulativeCache, &warmTvlCumulativeCache)
loadJsonToInterface(ctx, warmTvlCumulativeCacheFilePath, &muWarmTvlCumulativeCache, &warmTvlCumulativeCache)
}
now := time.Now().UTC()
@ -197,7 +197,7 @@ func createTvlCumulativeOfInterval(tbl *bigtable.Table, ctx context.Context, sta
}
if cacheNeedsUpdate {
persistInterfaceToJson(warmTvlCumulativeCacheFilePath, &muWarmTvlCumulativeCache, warmTvlCumulativeCache)
persistInterfaceToJson(ctx, warmTvlCumulativeCacheFilePath, &muWarmTvlCumulativeCache, warmTvlCumulativeCache)
}
// take the most recent n days, rather than returning all days since launch

View File

@ -36,7 +36,7 @@ type LockedAsset struct {
// finds the daily amount of each symbol transferred to each chain, from the specified start to the present.
func tvlInInterval(tbl *bigtable.Table, ctx context.Context, start time.Time) map[string]map[string]map[string]LockedAsset {
if len(warmTvlCache) == 0 {
loadJsonToInterface(warmTvlFilePath, &muWarmTvlCache, &warmTvlCache)
loadJsonToInterface(ctx, warmTvlFilePath, &muWarmTvlCache, &warmTvlCache)
}
results := map[string]map[string]map[string]LockedAsset{}
@ -77,7 +77,7 @@ func tvlInInterval(tbl *bigtable.Table, ctx context.Context, start time.Time) ma
if len(warmTvlCache) >= 1 {
// have a cache, check if has the date
if dateCache, ok := warmTvlCache[dateStr]; ok && useCache(dateStr) && len(dateCache) > 1 {
if dateCache, ok := warmTvlCache[dateStr]; ok && len(dateCache) > 1 && useCache(dateStr) {
// have a cache for this date
if daysAgo >= 1 {
// only use the cache for yesterday and older
@ -162,7 +162,7 @@ func tvlInInterval(tbl *bigtable.Table, ctx context.Context, start time.Time) ma
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(warmTvlFilePath, &muWarmTvlCache, warmTvlCache)
persistInterfaceToJson(ctx, warmTvlFilePath, &muWarmTvlCache, warmTvlCache)
}
// create a set of all the keys from all dates/chains, to ensure the result objects all have the same chain keys
@ -351,7 +351,7 @@ func TVL(w http.ResponseWriter, r *http.Request) {
}
}
coinIdSet := []string{}
for coinId, _ := range seenCoinIds {
for coinId := range seenCoinIds {
coinIdSet = append(coinIdSet, coinId)
}

View File

@ -12,6 +12,7 @@ import (
"sort"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/bigtable"
@ -21,7 +22,10 @@ import (
// to do a full table scan with each request.
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
var warmCache = map[string]map[string]string{}
var lastCacheReset = time.Now()
var lastCacheReset time.Time
var muWarmRecentCache sync.RWMutex
var warmRecentCacheFilePath = "recent-cache.json"
var timestampKey = "lastUpdate"
// query for last of each rowKey prefix
func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int) map[string]string {
@ -30,6 +34,21 @@ func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, pre
if prefix == "" {
cachePrefix = "*"
}
if _, ok := warmCache[cachePrefix]; !ok {
loadJsonToInterface(ctx, warmRecentCacheFilePath, &muWarmRecentCache, &warmCache)
}
cacheNeedsUpdate := false
if cache, ok := warmCache[cachePrefix]; ok {
if lastUpdate, ok := cache[timestampKey]; ok {
time, err := time.Parse(time.RFC3339, lastUpdate)
if err == nil {
lastCacheReset = time
} else {
log.Printf("failed parsing lastUpdate timestamp from cache. lastUpdate %v, err: %v ", lastUpdate, err)
}
}
}
var rowSet bigtable.RowSet
rowSet = bigtable.PrefixRange(prefix)
@ -42,9 +61,11 @@ func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, pre
maxSeq := "9999999999999999"
rowSets := bigtable.RowRangeList{}
for k, v := range cached {
start := fmt.Sprintf("%v:%v", k, v)
end := fmt.Sprintf("%v:%v", k, maxSeq)
rowSets = append(rowSets, bigtable.NewRange(start, end))
if k != timestampKey {
start := fmt.Sprintf("%v:%v", k, v)
end := fmt.Sprintf("%v:%v", k, maxSeq)
rowSets = append(rowSets, bigtable.NewRange(start, end))
}
}
if len(rowSets) >= 1 {
rowSet = rowSets
@ -54,11 +75,12 @@ func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, pre
// cache is more than hour old, don't use it, reset it
warmCache = map[string]map[string]string{}
lastCacheReset = now
cacheNeedsUpdate = true
}
// create a time range for query: last 30 days
thirtyDays := -time.Duration(24*30) * time.Hour
prev := now.Add(thirtyDays)
// create a time range for query: last seven days
sevenDays := -time.Duration(24*7) * time.Hour
prev := now.Add(sevenDays)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
end := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, maxNano, now.Location())
@ -82,10 +104,19 @@ func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, pre
}
// update the cache with the latest rows
warmCache[cachePrefix] = mostRecentByKeySegment
for k, v := range mostRecentByKeySegment {
warmCache[cachePrefix][k] = v
}
warmCache[cachePrefix][timestampKey] = time.Now().Format(time.RFC3339)
if cacheNeedsUpdate {
persistInterfaceToJson(ctx, warmRecentCacheFilePath, &muWarmRecentCache, warmCache)
}
return mostRecentByKeySegment
}
func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int, numRowsToFetch int) (map[string][]bigtable.Row, error) {
const MAX_INT64 = 9223372036854775807
func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int, numRowsToFetch uint64) (map[string][]bigtable.Row, error) {
// returns { key: []bigtable.Row }, key either being "*", "chainID", "chainID:address"
latest := getLatestOfEachEmitterAddress(tbl, ctx, prefix, keySegments)
@ -94,15 +125,31 @@ func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string
rangePairs := map[string]string{}
for prefixGroup, highestSequence := range latest {
numRows := numRowsToFetch
if prefixGroup == timestampKey {
continue
}
rowKeyParts := strings.Split(prefixGroup, ":")
// convert the sequence part of the rowkey from a string to an int, so it can be used for math
highSequence, _ := strconv.Atoi(highestSequence)
lowSequence := highSequence - numRowsToFetch
highSequence, err := strconv.ParseUint(highestSequence, 10, 64)
if err != nil {
log.Println("error parsing sequence string", highSequence)
}
if highSequence < numRows {
numRows = highSequence
}
lowSequence := highSequence - numRows
// create a rowKey to use as the start of the range query
rangeQueryStart := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], lowSequence)
// create a rowKey with the highest seen sequence + 1, because range end is exclusive
rangeQueryEnd := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], highSequence+1)
rangePairs[rangeQueryStart] = rangeQueryEnd
if highSequence >= lowSequence {
rangePairs[rangeQueryStart] = rangeQueryEnd
} else {
// governance messages have non-sequential sequence numbers.
log.Printf("skipping %v:%v because sequences are strange. high/low: %d/%d", rowKeyParts[0], rowKeyParts[1], highSequence, lowSequence)
}
}
rangeList := bigtable.RowRangeList{}
@ -199,12 +246,12 @@ func Recent(w http.ResponseWriter, r *http.Request) {
return
}
var resultCount int
var resultCount uint64
if numRows == "" {
resultCount = 30
} else {
var convErr error
resultCount, convErr = strconv.Atoi(numRows)
resultCount, convErr = strconv.ParseUint(numRows, 10, 64)
if convErr != nil {
fmt.Fprint(w, "numRows must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
@ -267,7 +314,7 @@ func Recent(w http.ResponseWriter, r *http.Request) {
return iTimestamp > jTimestamp
})
// trim the result down to the requested amount now that sorting is complete
num := len(v)
num := uint64(len(v))
var rows []bigtable.Row
if num > resultCount {
rows = v[:resultCount]

View File

@ -1,8 +1,11 @@
package p
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"math"
"net/http"
@ -13,6 +16,7 @@ import (
"cloud.google.com/go/bigtable"
"cloud.google.com/go/pubsub"
"cloud.google.com/go/storage"
"github.com/certusone/wormhole/node/pkg/vaa"
)
@ -24,6 +28,10 @@ var client *bigtable.Client
var clientOnce sync.Once
var tbl *bigtable.Table
var storageClient *storage.Client
var cacheBucketName string
var cacheBucket *storage.BucketHandle
var pubsubClient *pubsub.Client
var pubSubTokenTransferDetailsTopic *pubsub.Topic
@ -31,12 +39,6 @@ var coinGeckoCoins = map[string][]CoinGeckoCoin{}
var solanaTokens = map[string]SolanaToken{}
var releaseDay = time.Date(2021, 9, 13, 0, 0, 0, 0, time.UTC)
var pwd string
func initCache(waitgroup *sync.WaitGroup, filePath string, mutex *sync.RWMutex, cacheInterface interface{}) {
defer waitgroup.Done()
loadJsonToInterface(filePath, mutex, cacheInterface)
}
// init runs during cloud function initialization. So, this will only run during an
// an instance's cold start.
@ -64,108 +66,80 @@ func init() {
})
tbl = client.Open("v2Events")
cacheBucketName = os.Getenv("CACHE_BUCKET")
if cacheBucketName != "" {
// Create storage client.
var err error
storageClient, err = storage.NewClient(context.Background())
if err != nil {
log.Fatalf("Failed to create storage client: %v", err)
}
cacheBucket = storageClient.Bucket(cacheBucketName)
}
// create the topic that will be published to after decoding token transfer payloads
tokenTransferDetailsTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC")
if tokenTransferDetailsTopic != "" {
pubSubTokenTransferDetailsTopic = pubsubClient.Topic(tokenTransferDetailsTopic)
// fetch the token lists once at start up
coinGeckoCoins = fetchCoinGeckoCoins()
solanaTokens = fetchSolanaTokenList()
}
pwd, _ = os.Getwd()
// initialize in-memory caches
var initWG sync.WaitGroup
initWG.Add(1)
// populates cache used by amountsTransferredToInInterval
go initCache(&initWG, warmTransfersToCacheFilePath, &muWarmTransfersToCache, &warmTransfersToCache)
initWG.Add(1)
// populates cache used by createTransfersOfInterval
go initCache(&initWG, warmTransfersCacheFilePath, &muWarmTransfersCache, &warmTransfersCache)
initWG.Add(1)
// populates cache used by createAddressesOfInterval
go initCache(&initWG, warmAddressesCacheFilePath, &muWarmAddressesCache, &warmAddressesCache)
initWG.Add(1)
// populates cache used by transferredToSince
go initCache(&initWG, transferredToUpToYesterdayFilePath, &muTransferredToUpToYesterday, &transferredToUpToYesterday)
// initWG.Add(1)
// populates cache used by transferredSince
// initCache(initWG, transferredUpToYesterdayFilePath, &muTransferredToUpYesterday, &transferredUpToYesterday)
initWG.Add(1)
// populates cache used by addressesTransferredToSince
go initCache(&initWG, addressesToUpToYesterdayFilePath, &muAddressesToUpToYesterday, &addressesToUpToYesterday)
initWG.Add(1)
// populates cache used by createCumulativeAmountsOfInterval
go initCache(&initWG, warmCumulativeCacheFilePath, &muWarmCumulativeCache, &warmCumulativeCache)
initWG.Add(1)
// populates cache used by createCumulativeAddressesOfInterval
go initCache(&initWG, warmCumulativeAddressesCacheFilePath, &muWarmCumulativeAddressesCache, &warmCumulativeAddressesCache)
initWG.Wait()
log.Println("done initializing caches, starting.")
coinGeckoCoins = fetchCoinGeckoCoins()
solanaTokens = fetchSolanaTokenList()
}
var gcpCachePath = "/workspace/src/p/cache"
func timeTrack(start time.Time, name string) {
elapsed := time.Since(start)
log.Printf("%s took %s", name, elapsed)
}
func loadJsonToInterface(filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
// create path to the static cache dir
path := gcpCachePath + filePath
// create path to the "hot" cache dir
hotPath := "/tmp" + filePath
if strings.HasSuffix(pwd, "cmd") {
// alter the path to be correct when running locally, and in Tilt devnet
path = "../cache" + filePath
hotPath = ".." + hotPath
// reads the specified file from the CACHE_BUCKET and unmarshals the json into the supplied interface.
func loadJsonToInterface(ctx context.Context, filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
if cacheBucket == nil {
log.Println("no cacheBucket supplied, not going to read cache")
return
}
defer timeTrack(time.Now(), fmt.Sprintf("reading %v", filePath))
mutex.Lock()
// first check to see if there is a cache file in the tmp dir of the cloud function.
// if so, this is a long running instance with a recently generated cache available.
fileData, readErrTmp := os.ReadFile(hotPath)
if readErrTmp != nil {
log.Printf("failed reading from tmp cache %v, err: %v", hotPath, readErrTmp)
var readErr error
fileData, readErr = os.ReadFile(path)
if readErr != nil {
log.Printf("failed reading %v, err: %v", path, readErr)
} else {
log.Printf("successfully read from cache: %v", path)
}
} else {
log.Printf("successfully read from tmp cache: %v", hotPath)
reader, readErr := cacheBucket.Object(filePath).NewReader(ctx)
if readErr != nil {
log.Printf("Failed reading %v in GCS. err: %v", filePath, readErr)
}
defer reader.Close()
fileData, err := io.ReadAll(reader)
if err != nil {
log.Printf("loadJsonToInterface: unable to read data. file %q: %v", filePath, err)
}
unmarshalErr := json.Unmarshal(fileData, &cacheMap)
mutex.Unlock()
if unmarshalErr != nil {
log.Printf("failed unmarshaling %v, err: %v", path, unmarshalErr)
log.Printf("failed unmarshaling %v, err: %v", filePath, unmarshalErr)
}
}
func persistInterfaceToJson(filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
path := "/tmp" + filePath
if strings.HasSuffix(pwd, "cmd") {
// alter the path to be correct when running locally, and in Tilt devnet
path = "../cache" + filePath
// writes the supplied interface to the CACHE_BUCKET/filePath.
func persistInterfaceToJson(ctx context.Context, filePath string, mutex *sync.RWMutex, cacheMap interface{}) {
if cacheBucket == nil {
log.Println("no cacheBucket supplied, not going to persist cache")
return
}
defer timeTrack(time.Now(), fmt.Sprintf("writing %v", filePath))
mutex.Lock()
cacheBytes, marshalErr := json.MarshalIndent(cacheMap, "", " ")
if marshalErr != nil {
log.Fatal("failed marshaling cacheMap.", marshalErr)
}
writeErr := os.WriteFile(path, cacheBytes, 0666)
mutex.Unlock()
if writeErr != nil {
log.Fatalf("failed writing to file %v, err: %v", path, writeErr)
wc := cacheBucket.Object(filePath).NewWriter(ctx)
reader := bytes.NewReader(cacheBytes)
if _, writeErr := io.Copy(wc, reader); writeErr != nil {
log.Printf("failed writing to file %v, err: %v", filePath, writeErr)
}
mutex.Unlock()
if err := wc.Close(); err != nil {
log.Printf("Writer.Close with error: %v", err)
}
log.Printf("successfully wrote cache to file: %v", path)
}
var columnFamilies = []string{
@ -264,6 +238,16 @@ func chainIdStringToType(chainId string) vaa.ChainID {
return vaa.ChainIDBSC
case "5":
return vaa.ChainIDPolygon
case "6":
return vaa.ChainIDAvalanche
case "7":
return vaa.ChainIDOasis
case "8":
return vaa.ChainIDAlgorand
case "10":
return vaa.ChainIDFantom
case "10001":
return vaa.ChainIDEthereumRopsten
}
return vaa.ChainIDUnset
}

View File

@ -20,15 +20,18 @@ import (
const maxNano int = 999999999
type totalsResult struct {
LastDayCount map[string]int
TotalCount map[string]int
DailyTotals map[string]map[string]int
LastDayCount map[string]int
TotalCount map[string]int
TotalCountDurationDays int
DailyTotals map[string]map[string]int
}
// warmCache keeps some data around between invocations, so that we don't have
// to do a full table scan with each request.
// https://cloud.google.com/functions/docs/bestpractices/tips#use_global_variables_to_reuse_objects_in_future_invocations
var warmTotalsCache = map[string]map[string]map[string]int{}
var muWarmTotalsCache sync.RWMutex
var warmTotalsCacheFilePath = "totals-cache.json"
// derive the result index relevant to a row.
func makeGroupKey(keySegments int, rowKey string) string {
@ -63,7 +66,10 @@ func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string
}
func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
var mu sync.RWMutex
if _, ok := warmTotalsCache["2021-09-13"]; !ok {
loadJsonToInterface(ctx, warmTotalsCacheFilePath, &muWarmTotalsCache, &warmTotalsCache)
}
results := map[string]map[string]int{}
now := time.Now().UTC()
@ -78,6 +84,7 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
cachePrefix = "*"
}
cachePrefix = fmt.Sprintf("%v-%v", cachePrefix, keySegments)
cacheNeedsUpdate := false
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
go func(tbl *bigtable.Table, ctx context.Context, prefix string, daysAgo int) {
@ -99,11 +106,11 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
dateStr := start.Format("2006-01-02")
mu.Lock()
muWarmTotalsCache.Lock()
// initialize the map for this date in the result set
results[dateStr] = map[string]int{"*": 0}
// check to see if there is cache data for this date/query
if dateCache, ok := warmTotalsCache[dateStr]; ok {
if dateCache, ok := warmTotalsCache[dateStr]; ok && useCache(dateStr) {
// have a cache for this date
if val, ok := dateCache[cachePrefix]; ok {
@ -111,7 +118,7 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
if daysAgo >= 1 {
// only use the cache for yesterday and older
results[dateStr] = val
mu.Unlock()
muWarmTotalsCache.Unlock()
intervalsWG.Done()
return
}
@ -124,7 +131,7 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
warmTotalsCache[dateStr] = map[string]map[string]int{}
warmTotalsCache[dateStr][cachePrefix] = map[string]int{}
}
mu.Unlock()
muWarmTotalsCache.Unlock()
var result []bigtable.Row
var fetchErr error
@ -146,13 +153,23 @@ func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix str
results[dateStr][countBy] = results[dateStr][countBy] + 1
}
// set the result in the cache
warmTotalsCache[dateStr][cachePrefix] = results[dateStr]
if cacheData, ok := warmTotalsCache[dateStr][cachePrefix]; !ok || len(cacheData) <= 1 {
// set the result in the cache
muWarmTotalsCache.Lock()
warmTotalsCache[dateStr][cachePrefix] = results[dateStr]
muWarmTotalsCache.Unlock()
cacheNeedsUpdate = true
}
}(tbl, ctx, prefix, daysAgo)
}
intervalsWG.Wait()
if cacheNeedsUpdate {
persistInterfaceToJson(ctx, warmTotalsCacheFilePath, &muWarmTotalsCache, warmTotalsCache)
}
// create a set of all the keys from all dates, to ensure the result objects all have the same keys
seenKeySet := map[string]bool{}
for _, v := range results {
@ -210,12 +227,13 @@ func Totals(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numDays, groupBy, forChain, forAddress string
var last24Hours, numDays, groupBy, forChain, forAddress string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
last24Hours = queryParams.Get("last24Hours")
numDays = queryParams.Get("numDays")
groupBy = queryParams.Get("groupBy")
forChain = queryParams.Get("forChain")
@ -232,10 +250,11 @@ func Totals(w http.ResponseWriter, r *http.Request) {
case http.MethodPost:
// declare request body properties
var d struct {
NumDays string `json:"numDays"`
GroupBy string `json:"groupBy"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
Last24Hours string `json:"last24Hours"`
NumDays string `json:"numDays"`
GroupBy string `json:"groupBy"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
}
// deserialize request body
@ -250,6 +269,7 @@ func Totals(w http.ResponseWriter, r *http.Request) {
}
}
last24Hours = d.Last24Hours
numDays = d.NumDays
groupBy = d.GroupBy
forChain = d.ForChain
@ -261,10 +281,11 @@ func Totals(w http.ResponseWriter, r *http.Request) {
return
}
var queryDays int
if numDays == "" {
queryDays = 30
} else {
// default query period is all time
queryDays := int(time.Now().UTC().Sub(releaseDay).Hours() / 24)
// if the request included numDays, set the query period to that
if numDays != "" {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
@ -305,39 +326,23 @@ func Totals(w http.ResponseWriter, r *http.Request) {
// total of last 24 hours
var last24HourCount map[string]int
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourCount, err = messageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
if err != nil {
log.Printf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
// total of the last 30 days
var periodCount map[string]int
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
now := time.Now().UTC()
prev := now.Add(periodInterval)
start := time.Date(prev.Year(), prev.Month(), prev.Day(), 0, 0, 0, 0, prev.Location())
defer wg.Done()
periodCount, err = messageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
if err != nil {
log.Fatalf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
if last24Hours != "" {
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
now := time.Now().UTC()
start := now.Add(last24HourInterval)
defer wg.Done()
last24HourCount, err = messageCountForInterval(tbl, ctx, prefix, start, now, keySegments)
if err != nil {
log.Printf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
}
// daily totals
periodTotals := map[string]int{}
var dailyTotals map[string]map[string]int
wg.Add(1)
go func(prefix string, keySegments int, queryDays int) {
@ -347,14 +352,21 @@ func Totals(w http.ResponseWriter, r *http.Request) {
if err != nil {
log.Fatalf("failed getting createCountsOfInterval err %v", err)
}
// sum all the days to create a map with totals for the query period
for _, vals := range dailyTotals {
for chain, amount := range vals {
periodTotals[chain] += amount
}
}
}(prefix, keySegments, queryDays)
wg.Wait()
result := &totalsResult{
LastDayCount: last24HourCount,
TotalCount: periodCount,
DailyTotals: dailyTotals,
LastDayCount: last24HourCount,
TotalCount: periodTotals,
TotalCountDurationDays: queryDays,
DailyTotals: dailyTotals,
}
jsonBytes, err := json.Marshal(result)