BigTable historical queries

- Add Cloud Functions:
  - "Recent" gap list, can filter and/or group by chain or address.
  - "Totals" counts 24h, 30d, rolling daily. can filter and group.
  - "Transaction" lookup row by chain-native transaction identifier.

- Pad sequence in rowkey to fixed length, for sequential row order.

- Add Cloud Function deploy instructions.

- Fix #410 Buildpack image cleanup

Change-Id: Ifa3110a3d58e2f94adb48ccb451c27ab3add0611
This commit is contained in:
justinschuldt 2021-09-21 11:08:07 -05:00 committed by Justin Schuldt
parent 75c818d2bf
commit 71dbe80aae
13 changed files with 1287 additions and 110 deletions

View File

@ -84,7 +84,7 @@ local_resource(
if explorer:
k8s_yaml_with_ns(
secret_yaml_generic(
"bridge-bigtable-key",
"node-bigtable-key",
from_file = "bigtable-key.json=" + bigTableKeyPath,
),
)
@ -235,50 +235,76 @@ k8s_resource("eth-devnet2", port_forwards = [
def build_cloud_function(container_name, go_func_name, path, builder):
# Invokes Tilt's custom_build(), with a Pack command.
# inspired by https://github.com/tilt-dev/tilt-extensions/tree/master/pack
caching_ref = container_name + ":tilt-build-pack-caching"
tag = "latest"
caching_ref = container_name + ":" + tag
pack_build_cmd = " ".join([
"./tools/bin/pack build",
caching_ref,
"--path " + path,
"--builder " + builder,
"--run-image devnet-cloud-function",
"--env " + "GOOGLE_FUNCTION_TARGET=%s" % go_func_name,
"--env " + "GOOGLE_FUNCTION_SIGNATURE_TYPE=http",
])
disable_push = True
skips_local_docker = True
if ci:
# inherit the DOCKER_HOST socket provided by custom_build.
pack_build_cmd = pack_build_cmd + " --docker-host inherit"
# do not attempt to access Docker cache in CI
# pack_build_cmd = pack_build_cmd + " --clear-cache"
# don't try to pull previous container versions in CI
pack_build_cmd = pack_build_cmd + " --pull-policy never"
# push to kubernetes registry
disable_push = False
skips_local_docker = False
docker_tag_cmd = "docker tag " + caching_ref + " $EXPECTED_REF"
docker_tag_cmd = "tilt docker -- tag " + caching_ref + " $EXPECTED_REF"
custom_build(
container_name,
pack_build_cmd + " && " + docker_tag_cmd,
[path],
tag=tag,
skips_local_docker=skips_local_docker,
disable_push=disable_push,
)
if explorer:
build_cloud_function(
container_name = "cloud-function-readrow",
go_func_name = "ReadRow",
path = "./event_database/cloud_functions",
builder = "gcr.io/buildpacks/builder:v1",
local_resource(
name = "devnet-cloud-function",
cmd = "tilt docker -- build -f ./event_database/cloud_functions/Dockerfile.run . -t devnet-cloud-function --label builtby=tilt",
env = {"DOCKER_BUILDKIT": "1"},
labels = ["explorer"],
)
local_resource(
name = "pack-bin",
cmd = "go build -mod=readonly -o bin/pack github.com/buildpacks/pack/cmd/pack",
dir = "tools",
labels = ["explorer"],
)
k8s_yaml_with_ns("devnet/bigtable.yaml")
k8s_resource("bigtable-emulator", port_forwards = [
port_forward(8086, name = "BigTable clients [:8086]"),
])
k8s_resource("bigtable-emulator",
port_forwards = [port_forward(8086, name = "BigTable clients [:8086]")],
labels = ["explorer"],
)
build_cloud_function(
container_name = "bigtable-functions",
go_func_name = "Entry",
path = "./event_database/cloud_functions",
builder = "gcr.io/buildpacks/builder:v1",
)
k8s_resource(
"bigtable-readrow",
resource_deps = ["proto-gen"],
port_forwards = [port_forward(8090, name = "ReadRow [:8090]")],
"bigtable-functions",
resource_deps = ["proto-gen", "bigtable-emulator"],
port_forwards = [port_forward(8090, name = "BigTable Functions [:8090]")],
labels = ["explorer"]
)
# explorer web app
@ -301,6 +327,7 @@ if explorer:
port_forwards = [
port_forward(8001, name = "Explorer Web UI [:8001]"),
],
labels = ["explorer"],
)
# terra devnet

View File

@ -64,43 +64,49 @@ apiVersion: v1
kind: Service
metadata:
labels:
app: bigtable-readrow
name: bigtable-readrow
app: bigtable-functions
name: bigtable-functions
spec:
ports:
- name: readrow
- name: functions
port: 8090
targetPort: readrow
targetPort: functions
protocol: TCP
selector:
app: bigtable-readrow
app: bigtable-functions
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
labels:
app: bigtable-readrow
name: bigtable-readrow
app: bigtable-functions
name: bigtable-functions
spec:
serviceName: bigtable-readrow
serviceName: bigtable-functions
replicas: 1
selector:
matchLabels:
app: bigtable-readrow
app: bigtable-functions
template:
metadata:
labels:
app: bigtable-readrow
app: bigtable-functions
spec:
containers:
- name: bigtable-readrow
image: cloud-function-readrow
- name: bigtable-functions
image: bigtable-functions
env:
- name: BIGTABLE_EMULATOR_HOST
value: bigtable-emulator:8086
- name: GCP_PROJECT
value: local-dev
- name: BIGTABLE_INSTANCE
value: wormhole
ports:
- containerPort: 8080
name: readrow
name: functions
protocol: TCP
readinessProbe:
httpGet:
port: 8080
path: /readyz

View File

@ -0,0 +1,5 @@
# syntax=docker.io/docker/dockerfile:experimental@sha256:de85b2f3a3e8a2f7fe48e8e84a65f6fdd5cd5183afa6412fff9caa6871649c44
FROM gcr.io/buildpacks/gcp/run@sha256:04f2e841ebbcc140c9f817e274caf8ae8ae0341008c4e01d8ef84cb8aa8c312a
# the "builtby=tilt" label is how Tilt determines which containters can be purged.
LABEL builtby=tilt

View File

@ -2,34 +2,34 @@
This is a reference implementaion for getting data out of BigTable.
### deploying
First deploy (creation) must include all the flags to configure the environment:
gcloud functions --project your-project deploy testnet --region europe-west3 --entry-point Entry --runtime go116 --trigger-http --allow-unauthenticated --service-account=your-readonly@your-project.iam.gserviceaccount.com --update-env-vars GCP_PROJECT=your-project,BIGTABLE_INSTANCE=wormhole-testnet
Subsequent deploys (updates) only need include flags to indentify the resource for updating: project, region, name.
gcloud functions --project your-project deploy testnet --region europe-west3 --entry-point Entry
### invocation
both methods read the same data. just two different ways of querying:
All routes accept their input(s) as query parameters, or request body. Just two different ways of querying:
GET
```bash
curl "https://region-project-id.cloudfunctions.net/your-function-name?emitterChain=2&emitterAddress=000000000000000000000000e982e462b094850f12af94d21d470e21be9d0e9c&sequence=6"
curl "https://region-project-id.cloudfunctions.net/testnet/readrow?emitterChain=2&emitterAddress=000000000000000000000000e982e462b094850f12af94d21d470e21be9d0e9c&sequence=0000000000000006"
```
POST
```bash
curl -X POST https://region-project-id.cloudfunctions.net/your-function-name \
curl -X POST https://region-project-id.cloudfunctions.net/testnet/readrow \
-H "Content-Type:application/json" \
-d \
'{"emitterChain":"2", "emitterAddress":"000000000000000000000000e982e462b094850f12af94d21d470e21be9d0e9c", "sequence":"6"}' | jq '.'
{
"Message": {
"InitiatingTxID": "0x47727f32a3c6033044fd9f11778c6b5691262533607a654fd020c068e5d12fba",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAAApD7FnIIr0VbsTd4AWO3t6mhDBYAAjsehQZiDTcv6/TspR/9xdoL+60kUe5xBKmz74vCab+YAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=="
},
"GuardianAddresses": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAA": "AQAAAAABAIOSpeda6nEXWxJoS/d59cniULw0+DDSOVBxxOZPltunSM0BHgoJh6Srbg8Fa4eqLlifpCibLJx9MbJSwbXerZkAAAE4yuBzAQAAAgAAAAAAAAAAAAAAAOmC5GKwlIUPEq+U0h1HDiG+nQ6cAAAAAAAAAAYPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAAApD7FnIIr0VbsTd4AWO3t6mhDBYAAjsehQZiDTcv6/TspR/9xdoL+60kUe5xBKmz74vCab+YAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-08-11 00:16:11.757 +0000 UTC"
}
'{"emitterChain":"2", "emitterAddress":"000000000000000000000000e982e462b094850f12af94d21d470e21be9d0e9c", "sequence":"0000000000000006"}'
```
See [./bigtable-endpoints.md](./bigtable-endpoints.md) for API patterns

View File

@ -0,0 +1,375 @@
# API design
There are two endpoints designed to be flexible enough to answer most questions; "recent" and "totals".
- "recent" returns rows, is a gap-list query
- "totals" returns counts of how many rows were found in the period
---
## QueryParams
These endpoints can be used to query across all chains and addresses, and you can also drill-down into a chain or address.
### groupBy
- `groupBy=chain` results will be grouped by (keyed by) `emitterChain`.
- `groupBy=address` results will be be grouped by (keyed by) `emitterChain:emitterAddress`.
### filter
- `forChain=2` only returns results for the specified chain.
- `forChain=2&forAddress=c69a...cb4f` only returns results for the specified chain + address.
### endpoint specific
- `/totals?numDays=6` specify the query interval.
- `/recent?numRows=6` specify the number of results.
---
## `Totals` function
Get the number of messages in the last 7 days. The `*` key designates all results.
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/totals?numDays=7
```json
{
"LastDayCount": { "*": 14},
"PeriodCount": { "*": 69},
"DailyTotals": {
"2021-09-21": {"*": 55},
"2021-09-22": {"*": 0},
"2021-09-23": {"*": 0},
"2021-09-24": {"*": 0},
"2021-09-25": {"*": 0},
"2021-09-26": {"*": 0},
"2021-09-27": {"*": 14},
"2021-09-28": {"*": 0},
}
}
```
Get message counts grouped by chain, for the last 7 days:
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/totals?groupBy=chain&numDays=7
```json
{
"LastDayCount": {
"1": 8,
"2": 3,
"4": 3,
"*": 14
},
"LastMonthCount": {
"1": 21,
"2": 24,
"4": 24,
"*": 69
},
"DailyTotals": {
"2021-09-21": {
"1": 13,
"2": 21,
"4": 21,
"*": 55
},
"2021-09-22": {
"1": 0,
"2": 0,
"4": 0,
"*": 0
},
"2021-09-23": {
"1": 0,
"2": 0,
"4": 0,
"*": 0
},
"2021-09-24": {
"1": 0,
"2": 0,
"4": 0,
"*": 0
},
"2021-09-25": {
"1": 0,
"2": 0,
"4": 0,
"*": 0
},
"2021-09-26": {
"1": 0,
"2": 0,
"4": 0,
"*": 0
},
"2021-09-27": {
"1": 8,
"2": 3,
"4": 3,
"*": 14
},
"2021-09-28": {
"1": 0,
"2": 0,
"4": 0,
"*": 0
}
}
}
```
Get message counts grouped by EmitterAddress, for the previous 3 days (includes the current day):
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/totals?groupBy=address&numDays=3
```json
{
"LastDayCount": {
"*": 14,
"1:96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": 1,
"1:c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": 7,
"2:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 3,
"4:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 3
},
"TotalCount": {
"*": 14,
"1:96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": 1,
"1:c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": 7,
"2:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 3,
"4:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 3
},
"DailyTotals": {
"2021-09-25": {
"*": 0,
"1:96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": 0,
"1:c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": 0,
"2:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 0,
"4:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 0
},
"2021-09-26": {
"*": 0,
"1:96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": 0,
"1:c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": 0,
"2:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 0,
"4:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 0
},
"2021-09-27": {
"*": 14,
"1:96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": 1,
"1:c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": 7,
"2:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 3,
"4:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 3
},
"2021-09-28": {
"*": 0,
"1:96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab": 0,
"1:c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f": 0,
"2:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 0,
"4:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": 0
}
}
}
```
---
## `Recent` function
Get the 2 most recent messages:
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/recent?numRows=2
```json
{
"*": [
{
"EmitterChain": "solana",
"EmitterAddress": "c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f",
"Sequence": "17",
"InitiatingTxID": "0xd418d81b7b2f298a37b28b97e240237b6210f00b702d2101d5e423ab5fa6366b",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABADjricLUCKqwbuHYEgG8dMetrH5acGibV/l4z6mNzYmyXlE0sPK4lVngQ5c+vwWU0XYVlrh1KoCsEhZF132ouo8BYUk6ywAA1PUAAcaaGxpl3TNr8d9qd6+1Afwl23/Ak4ywhZWp70cyZctPAAAAAAAAABEgAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:52:26.038 +0000 UTC"
},
{
"EmitterChain": "solana",
"EmitterAddress": "c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f",
"Sequence": "16",
"InitiatingTxID": "0xd2bcadceb8c1beb7cd531e2c621733b96df96a397ea88abb948cc28c1546e139",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABACISbeEGlIf5z32yTEQDw2zNgS4GUj36YSTlSCqTj4lgaH663yeir/4Gi9iM6OWWc4Vct2UiE5jfv4PW8MTrdr0BYUk6sAAABBMAAcaaGxpl3TNr8d9qd6+1Afwl23/Ak4ywhZWp70cyZctPAAAAAAAAABAgAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:51:59.138 +0000 UTC"
}
]
}
```
Get the 2 most recent messages for each chain:
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/recent?numRows=2&groupBy=chain
```json
{
"1": [
{
"EmitterChain": "solana",
"EmitterAddress": "c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f",
"Sequence": "19",
"InitiatingTxID": "0xd7a34663ce6ee1d1c42f24513f6f37221e81e16a5153d542d2c951af1401e49d",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAOcc6ah0v1QFBl8SOkzKzAme6I2Us/kGwM1QCumJNqOnGmsH82w0k+1kgxu6yHA1XKRNUbJFgz/RfHrgfXUXKeEBYUk7PwAAph4AAcaaGxpl3TNr8d9qd6+1Afwl23/Ak4ywhZWp70cyZctPAAAAAAAAABMgAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:54:22.107 +0000 UTC"
},
{
"EmitterChain": "solana",
"EmitterAddress": "c69a1b1a65dd336bf1df6a77afb501fc25db7fc0938cb08595a9ef473265cb4f",
"Sequence": "18",
"InitiatingTxID": "0x32e8a87d4cd8a717e4d785bb317398c4cc8e36fbe45c53b75e4e85dc1181c92b",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAMCe6wEJplDwtyr7ELM15nrSSMSr6xYcuDC3qA0Mx1WKdy7WRXE13tP9SyMJ/sYESqpJtgvYnNEB3wnUeEbW2scAYUk6+AAAGp4AAcaaGxpl3TNr8d9qd6+1Afwl23/Ak4ywhZWp70cyZctPAAAAAAAAABIgAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAgAAAAAAAAAAAAAAAJD4v2pHnzIOrQdEEaSw55ROqMnBAAIAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:53:11.139 +0000 UTC"
}
],
"2": [
{
"EmitterChain": "ethereum",
"EmitterAddress": "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16",
"Sequence": "23",
"InitiatingTxID": "0x0515a7375f101e79a1d5e0f5159cce98fe8fe861bd2ab548e22f43375b04defb",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAGclDJrZDoZ2BxHBCxpPHZFwRhwesOgV9gkcGCeqBQaTZj/PjYM/25a5owDllBvS2pAg0nkRWYJskJf+Z3vIqLcAAAAW9pRWAAAAAgAAAAAAAAAAAAAAAAKQ+xZyCK9FW7E3eAFjt7epoQwWAAAAAAAAABcPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:48:27.025 +0000 UTC"
},
{
"EmitterChain": "ethereum",
"EmitterAddress": "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16",
"Sequence": "22",
"InitiatingTxID": "0x9f2dbf04c8088009b8c0ae1313baee546ac604ad5f608dcf5291bee4aa19b57b",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAAPsvYSDgik3jFPBiH97URck6lQxeXKixD/U3YplSwx4EZPeVWLzqgzjCb5nhBhAafYY5MmVSf8YF1cnPW4qXO0BAAAW0sNgAQAAAgAAAAAAAAAAAAAAAAKQ+xZyCK9FW7E3eAFjt7epoQwWAAAAAAAAABYPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:47:51.506 +0000 UTC"
}
],
"4": [
{
"EmitterChain": "bsc",
"EmitterAddress": "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16",
"Sequence": "23",
"InitiatingTxID": "0x0515a7375f101e79a1d5e0f5159cce98fe8fe861bd2ab548e22f43375b04defb",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAEc9grHDBKGhicCbWPFFuEKxfEuWc+PS0C3smLeIrBkVCdm9Tg8q76MK47OeuTF+ieTAxG+d/z2B9OeMWd87oMsAAAAW9pRWAAAABAAAAAAAAAAAAAAAAAKQ+xZyCK9FW7E3eAFjt7epoQwWAAAAAAAAABcPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:48:26.983 +0000 UTC"
},
{
"EmitterChain": "bsc",
"EmitterAddress": "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16",
"Sequence": "22",
"InitiatingTxID": "0x9f2dbf04c8088009b8c0ae1313baee546ac604ad5f608dcf5291bee4aa19b57b",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABABSFvsV41QWUwqKJC+Q62PtxHWmludvu4AKQDxorezX4BzYhX0rkj9BDxPtEc+utn6Y5q/ryft+PdWX8WIDhxSMAAAAW0sNgAQAABAAAAAAAAAAAAAAAAAKQ+xZyCK9FW7E3eAFjt7epoQwWAAAAAAAAABYPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:47:51.419 +0000 UTC"
}
]
}
```
Get the 2 most recent messages for a specific address:
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/recent?numRows=2&forChain=2&forAddress=0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16
```json
{
"2:0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16": [
{
"EmitterChain": "ethereum",
"EmitterAddress": "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16",
"Sequence": "23",
"InitiatingTxID": "0x0515a7375f101e79a1d5e0f5159cce98fe8fe861bd2ab548e22f43375b04defb",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAGclDJrZDoZ2BxHBCxpPHZFwRhwesOgV9gkcGCeqBQaTZj/PjYM/25a5owDllBvS2pAg0nkRWYJskJf+Z3vIqLcAAAAW9pRWAAAAAgAAAAAAAAAAAAAAAAKQ+xZyCK9FW7E3eAFjt7epoQwWAAAAAAAAABcPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:48:27.025 +0000 UTC"
},
{
"EmitterChain": "ethereum",
"EmitterAddress": "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16",
"Sequence": "22",
"InitiatingTxID": "0x9f2dbf04c8088009b8c0ae1313baee546ac604ad5f608dcf5291bee4aa19b57b",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAAPsvYSDgik3jFPBiH97URck6lQxeXKixD/U3YplSwx4EZPeVWLzqgzjCb5nhBhAafYY5MmVSf8YF1cnPW4qXO0BAAAW0sNgAQAAAgAAAAAAAAAAAAAAAAKQ+xZyCK9FW7E3eAFjt7epoQwWAAAAAAAAABYPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:47:51.506 +0000 UTC"
}
]
}
```
---
## `Transaction` function
Lookup a message by the native transaction identifier from the user's interaction:
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/transaction?id=0x0515a7375f101e79a1d5e0f5159cce98fe8fe861bd2ab548e22f43375b04defb
```json
{
"EmitterChain": "bsc",
"EmitterAddress": "0000000000000000000000000290fb167208af455bb137780163b7b7a9a10c16",
"Sequence": "23",
"InitiatingTxID": "0x0515a7375f101e79a1d5e0f5159cce98fe8fe861bd2ab548e22f43375b04defb",
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAEc9grHDBKGhicCbWPFFuEKxfEuWc+PS0C3smLeIrBkVCdm9Tg8q76MK47OeuTF+ieTAxG+d/z2B9OeMWd87oMsAAAAW9pRWAAAABAAAAAAAAAAAAAAAAAKQ+xZyCK9FW7E3eAFjt7epoQwWAAAAAAAAABcPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAA3bZP5GqR1G7ilCBTn8Jf0Hxf6j4AAlraZ6SC3I261q1BLAdbD9zRURvzAgIW7YAEZEXawNBFAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
"QuorumTime": "2021-09-21 01:48:26.983 +0000 UTC"
}
```
---
## `ReadRow` function
Lookup a message by the MessageID values:
https://us-east4-wormhole-315720.cloudfunctions.net/devnet/readrow?emitterChain=1&emitterAddress=96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab&sequence=0
```json
{
"EmitterChain": "solana",
"EmitterAddress": "96ee982293251b48729804c8e8b24b553eb6b887867024948d2236fd37a577ab",
"Sequence": "0",
"InitiatingTxID": "0xcc3aedef591ff7725b9a1873a006b1431a6cc6e3ae69f03f7692a6053de06b3e",
"Payload": "AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAAFQVU5L8J+OuAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAE5vdCBhIFBVTkvwn464AAAAAAAAAAAAAAAAAAAAAAAAnABsSMjL8zhJywej+TYVnMUj+VkcsZmavUWJDsX+6bczaHR0cHM6Ly93cmFwcGVkcHVua3MuY29tOjMwMDAvYXBpL3B1bmtzL21ldGFkYXRhLzM5AAAAAAAAAAAAAAAAkPi/akefMg6tB0QRpLDnlE6oycEAAg==",
"GuardiansThatSigned": [
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
],
"SignedVAABytes": "AQAAAAABAP9HdhYz1TU+XRH7fVlYU9FJH8WVxknCJwDoPHvCM/2FMkRS8vuEIo/yvoW8TLkNJq7ydXhhZNzc/elwsBEEqZkBYVJaqAABTIMAAZbumCKTJRtIcpgEyOiyS1U+triHhnAklI0iNv03pXerAAAAAAAAAAABAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAAFQVU5L8J+OuAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAE5vdCBhIFBVTkvwn464AAAAAAAAAAAAAAAAAAAAAAAAnABsSMjL8zhJywej+TYVnMUj+VkcsZmavUWJDsX+6bczaHR0cHM6Ly93cmFwcGVkcHVua3MuY29tOjMwMDAvYXBpL3B1bmtzL21ldGFkYXRhLzM5AAAAAAAAAAAAAAAAkPi/akefMg6tB0QRpLDnlE6oycEAAg==",
"QuorumTime": "2021-09-27 23:58:33.874 +0000 UTC"
}
```

View File

@ -1,6 +1,6 @@
module example.com/cloudfunction
module github.com/certusone/wormhole/event_database/cloud_functions
// cloud runtime is go 1.13. just for reference.
// cloud runtime is go 1.16. just for reference.
require (
cloud.google.com/go/bigtable v1.10.1

View File

@ -11,64 +11,11 @@ import (
"net/http"
"os"
"strings"
"sync"
"cloud.google.com/go/bigtable"
)
// client is a global Bigtable client, to avoid initializing a new client for
// every request.
var client *bigtable.Client
var clientOnce sync.Once
var columnFamilies = []string{"MessagePublication", "Signatures", "VAAState", "QuorumState"}
type (
MessagePub struct {
InitiatingTxID string
Payload []byte
}
Summary struct {
Message MessagePub
GuardianAddresses []string
SignedVAA []byte
QuorumTime string
}
)
func makeSummary(row bigtable.Row) *Summary {
summary := &Summary{}
if _, ok := row[columnFamilies[0]]; ok {
message := &MessagePub{}
for _, item := range row[columnFamilies[0]] {
switch item.Column {
case "MessagePublication:InitiatingTxID":
message.InitiatingTxID = string(item.Value)
case "MessagePublication:Payload":
message.Payload = item.Value
}
}
summary.Message = *message
}
if _, ok := row[columnFamilies[1]]; ok {
for _, item := range row[columnFamilies[1]] {
column := strings.Split(item.Column, ":")
summary.GuardianAddresses = append(summary.GuardianAddresses, column[1])
}
}
if _, ok := row[columnFamilies[3]]; ok {
for _, item := range row[columnFamilies[3]] {
if item.Column == "QuorumState:SignedVAA" {
summary.SignedVAA = item.Value
summary.QuorumTime = item.Timestamp.Time().String()
}
}
}
return summary
}
// fetch a single row by the row key
func ReadRow(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
@ -82,15 +29,15 @@ func ReadRow(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var rowKey string
var emitterChain, emitterAddress, sequence, rowKey string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
emitterChain := queryParams.Get("emitterChain")
emitterAddress := queryParams.Get("emitterAddress")
sequence := queryParams.Get("sequence")
emitterChain = queryParams.Get("emitterChain")
emitterAddress = queryParams.Get("emitterAddress")
sequence = queryParams.Get("sequence")
readyCheck := queryParams.Get("readyCheck")
if readyCheck != "" {
@ -102,11 +49,10 @@ func ReadRow(w http.ResponseWriter, r *http.Request) {
// check for empty values
if emitterChain == "" || emitterAddress == "" || sequence == "" {
fmt.Fprint(w, "body values cannot be empty")
fmt.Fprint(w, "query params ['emitterChain', 'emitterAddress', 'sequence'] cannot be empty")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
rowKey = emitterChain + ":" + emitterAddress + ":" + sequence
case http.MethodPost:
// declare request body properties
var d struct {
@ -130,22 +76,44 @@ func ReadRow(w http.ResponseWriter, r *http.Request) {
// check for empty values
if d.EmitterChain == "" || d.EmitterAddress == "" || d.Sequence == "" {
fmt.Fprint(w, "body values cannot be empty")
fmt.Fprint(w, "body values ['emitterChain', 'emitterAddress', 'sequence'] cannot be empty")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
rowKey = d.EmitterChain + ":" + d.EmitterAddress + ":" + d.Sequence
emitterChain = d.EmitterChain
emitterAddress = d.EmitterAddress
sequence = d.Sequence
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
// pad sequence to 16 characters
if len(sequence) <= 15 {
sequence = fmt.Sprintf("%016s", sequence)
}
// convert chain name to chainID
if len(emitterChain) > 1 {
chainNameMap := map[string]string{
"solana": "1",
"ethereum": "2",
"terra": "3",
"bsc": "4",
}
lowercaseChain := strings.ToLower(emitterChain)
if _, ok := chainNameMap[lowercaseChain]; ok {
emitterChain = chainNameMap[lowercaseChain]
}
}
rowKey = emitterChain + ":" + emitterAddress + ":" + sequence
clientOnce.Do(func() {
// Declare a separate err variable to avoid shadowing client.
var err error
project := os.Getenv("GCP_PROJECT")
client, err = bigtable.NewClient(context.Background(), project, "wormhole")
instance := os.Getenv("BIGTABLE_INSTANCE")
client, err = bigtable.NewClient(context.Background(), project, instance)
if err != nil {
http.Error(w, "Error initializing client", http.StatusInternalServerError)
log.Printf("bigtable.NewClient: %v", err)

View File

@ -0,0 +1,246 @@
// Package p contains an HTTP Cloud Function.
package p
import (
"context"
"encoding/json"
"fmt"
"html"
"io"
"log"
"net/http"
"os"
"sort"
"strconv"
"strings"
"cloud.google.com/go/bigtable"
)
// query for last of each rowKey prefix
func getLatestOfEachEmitterAddress(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int) map[string]string {
mostRecentByKeySegment := map[string]string{}
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
keyParts := strings.Split(row.Key(), ":")
groupByKey := strings.Join(keyParts[:2], ":")
mostRecentByKeySegment[groupByKey] = row.Key()
return true
// TODO - add filter to only return rows created within the last 30(?) days
}, bigtable.RowFilter(bigtable.StripValueFilter()))
if err != nil {
log.Fatalf("failed to read recent rows: %v", err)
}
return mostRecentByKeySegment
}
func fetchMostRecentRows(tbl *bigtable.Table, ctx context.Context, prefix string, keySegments int, numRowsToFetch int) (map[string][]bigtable.Row, error) {
// returns { key: []bigtable.Row }, key either being "*", "chainID", "chainID:address"
latest := getLatestOfEachEmitterAddress(tbl, ctx, prefix, keySegments)
// key/value pairs are the start/stop rowKeys for range queries
rangePairs := map[string]string{}
for _, highestSequenceKey := range latest {
rowKeyParts := strings.Split(highestSequenceKey, ":")
// convert the sequence part of the rowkey from a string to an int, so it can be used for math
highSequence, _ := strconv.Atoi(rowKeyParts[2])
lowSequence := highSequence - numRowsToFetch
// create a rowKey to use as the start of the range query
rangeQueryStart := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], lowSequence)
// create a rowKey with the highest seen sequence + 1, because range end is exclusive
rangeQueryEnd := fmt.Sprintf("%v:%v:%016d", rowKeyParts[0], rowKeyParts[1], highSequence+1)
rangePairs[rangeQueryStart] = rangeQueryEnd
}
rangeList := bigtable.RowRangeList{}
for k, v := range rangePairs {
rangeList = append(rangeList, bigtable.NewRange(k, v))
}
results := map[string][]bigtable.Row{}
err := tbl.ReadRows(ctx, rangeList, func(row bigtable.Row) bool {
var groupByKey string
if keySegments == 0 {
groupByKey = "*"
} else {
keyParts := strings.Split(row.Key(), ":")
groupByKey = strings.Join(keyParts[:keySegments], ":")
}
results[groupByKey] = append(results[groupByKey], row)
return true
})
if err != nil {
log.Printf("failed reading row ranges. err: %v", err)
return nil, err
}
return results, nil
}
// fetch recent rows.
// optionally group by a EmitterChain or EmitterAddress
// optionally query for recent rows of a given EmitterChain or EmitterAddress
func Recent(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numRows, groupBy, forChain, forAddress string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
numRows = queryParams.Get("numRows")
groupBy = queryParams.Get("groupBy")
forChain = queryParams.Get("forChain")
forAddress = queryParams.Get("forAddress")
readyCheck := queryParams.Get("readyCheck")
if readyCheck != "" {
// for running in devnet
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, html.EscapeString("ready"))
return
}
case http.MethodPost:
// declare request body properties
var d struct {
NumRows string `json:"numRows"`
GroupBy string `json:"groupBy"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
numRows = d.NumRows
groupBy = d.GroupBy
forChain = d.ForChain
forAddress = d.ForAddress
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
var resultCount int
if numRows == "" {
resultCount = 30
} else {
var convErr error
resultCount, convErr = strconv.Atoi(numRows)
if convErr != nil {
fmt.Fprint(w, "numRows must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
// create bibtable client and open table
clientOnce.Do(func() {
// Declare a separate err variable to avoid shadowing client.
var err error
project := os.Getenv("GCP_PROJECT")
instance := os.Getenv("BIGTABLE_INSTANCE")
client, err = bigtable.NewClient(context.Background(), project, instance)
if err != nil {
http.Error(w, "Error initializing client", http.StatusInternalServerError)
log.Printf("bigtable.NewClient: %v", err)
return
}
})
tbl := client.Open("v2Events")
// use the groupBy value to determine how many segements of the rowkey should be used for indexing results.
keySegments := 0
if groupBy == "chain" {
keySegments = 1
}
if groupBy == "address" {
keySegments = 2
}
// create the rowkey prefix for querying, and the keySegments to use for indexing results.
prefix := ""
if forChain != "" {
prefix = forChain
if groupBy == "" {
// groupBy was not set, but forChain was, so set the keySegments to index by chain
keySegments = 1
}
if forAddress != "" {
prefix = forChain + ":" + forAddress
if groupBy == "" {
// groupBy was not set, but forAddress was, so set the keySegments to index by address
keySegments = 2
}
}
}
recent, err := fetchMostRecentRows(tbl, r.Context(), prefix, keySegments, resultCount)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Println(err.Error())
return
}
res := map[string][]*Summary{}
for k, v := range recent {
sort.Slice(v, func(i, j int) bool {
// bigtable rows dont have timestamps, use a cell timestamp all rows will have.
return v[i]["MessagePublication"][0].Timestamp > v[j]["MessagePublication"][0].Timestamp
})
// trim the result down to the requested amount now that sorting is complete
num := len(v)
var rows []bigtable.Row
if num > resultCount {
rows = v[:resultCount]
} else {
rows = v[:]
}
res[k] = make([]*Summary, len(rows))
for i, r := range rows {
res[k][i] = makeSummary(r)
}
}
jsonBytes, err := json.Marshal(res)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Println(err.Error())
return
}
w.WriteHeader(http.StatusOK)
w.Write(jsonBytes)
}

View File

@ -0,0 +1,88 @@
package p
import (
"net/http"
"strings"
"sync"
"cloud.google.com/go/bigtable"
)
// shared code for the various functions, primarily response formatting.
// client is a global Bigtable client, to avoid initializing a new client for
// every request.
var client *bigtable.Client
var clientOnce sync.Once
var columnFamilies = []string{"MessagePublication", "Signatures", "VAAState", "QuorumState"}
type (
Summary struct {
EmitterChain string
EmitterAddress string
Sequence string
InitiatingTxID string
Payload []byte
GuardiansThatSigned []string
SignedVAABytes []byte
QuorumTime string
}
)
func makeSummary(row bigtable.Row) *Summary {
summary := &Summary{}
if _, ok := row[columnFamilies[0]]; ok {
for _, item := range row[columnFamilies[0]] {
switch item.Column {
case "MessagePublication:InitiatingTxID":
summary.InitiatingTxID = string(item.Value)
case "MessagePublication:Payload":
summary.Payload = item.Value
case "MessagePublication:EmitterChain":
summary.EmitterChain = string(item.Value)
case "MessagePublication:EmitterAddress":
summary.EmitterAddress = string(item.Value)
case "MessagePublication:Sequence":
summary.Sequence = string(item.Value)
}
}
}
if _, ok := row[columnFamilies[1]]; ok {
for _, item := range row[columnFamilies[1]] {
column := strings.Split(item.Column, ":")
summary.GuardiansThatSigned = append(summary.GuardiansThatSigned, column[1])
}
}
if _, ok := row[columnFamilies[3]]; ok {
for _, item := range row[columnFamilies[3]] {
if item.Column == "QuorumState:SignedVAA" {
summary.SignedVAABytes = item.Value
summary.QuorumTime = item.Timestamp.Time().String()
}
}
}
return summary
}
var mux = newMux()
// Entry is the cloud function entry point
func Entry(w http.ResponseWriter, r *http.Request) {
mux.ServeHTTP(w, r)
}
func newMux() *http.ServeMux {
mux := http.NewServeMux()
mux.HandleFunc("/totals", Totals)
mux.HandleFunc("/recent", Recent)
mux.HandleFunc("/transaction", Transaction)
mux.HandleFunc("/readrow", ReadRow)
mux.HandleFunc("/readyz", func(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusOK) })
return mux
}

View File

@ -0,0 +1,332 @@
// Package p contains an HTTP Cloud Function.
package p
import (
"context"
"encoding/json"
"fmt"
"html"
"io"
"log"
"net/http"
"os"
"strconv"
"strings"
"sync"
"time"
"cloud.google.com/go/bigtable"
)
const maxNano int = 999999999
type totalsResult struct {
LastDayCount map[string]int
TotalCount map[string]int
DailyTotals map[string]map[string]int
}
// derive the result index relevant to a row.
func makeGroupKey(keySegments int, rowKey string) string {
var countBy string
if keySegments == 0 {
countBy = "*"
} else {
keyParts := strings.Split(rowKey, ":")
countBy = strings.Join(keyParts[:keySegments], ":")
}
return countBy
}
func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string, start, end time.Time) ([]bigtable.Row, error) {
rows := []bigtable.Row{}
err := tbl.ReadRows(ctx, bigtable.PrefixRange(prefix), func(row bigtable.Row) bool {
rows = append(rows, row)
return true
}, bigtable.RowFilter(
bigtable.ChainFilters(
// combine filters to get only what we need:
bigtable.CellsPerRowLimitFilter(1), // only the first cell in each column (helps for devnet where sequence resets)
bigtable.TimestampRangeFilter(start, end), // within time range
bigtable.StripValueFilter(), // no columns/values, just the row.Key()
)))
return rows, err
}
func createCountsOfInterval(tbl *bigtable.Table, ctx context.Context, prefix string, numPrevDays int, keySegments int) (map[string]map[string]int, error) {
results := map[string]map[string]int{}
// key track of all the keys seen, to ensure the result objects all have the same keys
seenKeySet := map[string]bool{}
now := time.Now()
for daysAgo := 0; daysAgo <= numPrevDays; daysAgo++ {
// start is the SOD, end is EOD
// "0 daysAgo start" is 00:00:00 AM of the current day
// "0 daysAgo end" is 23:59:59 of the current day (the future)
// calulate the start and end times for the query
hoursAgo := (24 * daysAgo)
daysAgoDuration := -time.Duration(hoursAgo) * time.Hour
n := now.Add(daysAgoDuration)
year := n.Year()
month := n.Month()
day := n.Day()
loc := n.Location()
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
end := time.Date(year, month, day, 23, 59, 59, maxNano, loc)
result, fetchErr := fetchRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
return nil, fetchErr
}
dateStr := start.Format("2006-01-02")
// initialize the map for this date in the result set
if results[dateStr] == nil {
results[dateStr] = map[string]int{"*": 0}
}
// iterate through the rows and increment the count
for _, row := range result {
countBy := makeGroupKey(keySegments, row.Key())
if keySegments != 0 {
// increment the total count
results[dateStr]["*"] = results[dateStr]["*"] + 1
}
results[dateStr][countBy] = results[dateStr][countBy] + 1
// add this key to the set
seenKeySet[countBy] = true
}
}
// ensure each date object has the same keys:
for _, v := range results {
for key := range seenKeySet {
if _, ok := v[key]; !ok {
// add the missing key to the map
v[key] = 0
}
}
}
return results, nil
}
// returns the count of the rows in the query response
func messageCountForInterval(tbl *bigtable.Table, ctx context.Context, prefix string, interval time.Duration, keySegments int) (map[string]int, error) {
now := time.Now()
// calulate the start and end times for the query
n := now.Add(interval)
year := n.Year()
month := n.Month()
day := n.Day()
loc := n.Location()
start := time.Date(year, month, day, 0, 0, 0, 0, loc)
end := time.Date(now.Year(), now.Month(), now.Day(), 23, 59, 59, maxNano, loc)
// query for all rows in time range, return result count
results, fetchErr := fetchRowsInInterval(tbl, ctx, prefix, start, end)
if fetchErr != nil {
log.Printf("fetchRowsInInterval returned an error: %v", fetchErr)
return nil, fetchErr
}
result := map[string]int{"*": len(results)}
// iterate through the rows and increment the count for each index
if keySegments != 0 {
for _, row := range results {
countBy := makeGroupKey(keySegments, row.Key())
result[countBy] = result[countBy] + 1
}
}
return result, nil
}
// get number of recent transactions in the last 24 hours, and daily for a period
// optionally group by a EmitterChain or EmitterAddress
// optionally query for recent rows of a given EmitterChain or EmitterAddress
func Totals(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var numDays, groupBy, forChain, forAddress string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
numDays = queryParams.Get("numDays")
groupBy = queryParams.Get("groupBy")
forChain = queryParams.Get("forChain")
forAddress = queryParams.Get("forAddress")
readyCheck := queryParams.Get("readyCheck")
if readyCheck != "" {
// for running in devnet
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, html.EscapeString("ready"))
return
}
case http.MethodPost:
// declare request body properties
var d struct {
NumDays string `json:"numDays"`
GroupBy string `json:"groupBy"`
ForChain string `json:"forChain"`
ForAddress string `json:"forAddress"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
numDays = d.NumDays
groupBy = d.GroupBy
forChain = d.ForChain
forAddress = d.ForAddress
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
var queryDays int
if numDays == "" {
queryDays = 30
} else {
var convErr error
queryDays, convErr = strconv.Atoi(numDays)
if convErr != nil {
fmt.Fprint(w, "numDays must be an integer")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
// create bibtable client and open table
clientOnce.Do(func() {
// Declare a separate err variable to avoid shadowing client.
var err error
project := os.Getenv("GCP_PROJECT")
instance := os.Getenv("BIGTABLE_INSTANCE")
client, err = bigtable.NewClient(context.Background(), project, instance)
if err != nil {
http.Error(w, "Error initializing client", http.StatusInternalServerError)
log.Printf("bigtable.NewClient: %v", err)
return
}
})
tbl := client.Open("v2Events")
// create the rowkey prefix for querying
prefix := ""
if forChain != "" {
prefix = forChain
if forAddress != "" {
prefix = forChain + ":" + forAddress
}
}
// use the groupBy value to determine how many segements of the rowkey should be used.
keySegments := 0
if groupBy == "chain" {
keySegments = 1
}
if groupBy == "address" {
keySegments = 2
}
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
var wg sync.WaitGroup
// total of last 24 hours
var last24HourCount map[string]int
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
last24HourInterval := -time.Duration(24) * time.Hour
defer wg.Done()
last24HourCount, err = messageCountForInterval(tbl, ctx, prefix, last24HourInterval, keySegments)
if err != nil {
log.Printf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
// total of the last 30 days
var periodCount map[string]int
wg.Add(1)
go func(prefix string, keySegments int) {
var err error
hours := (24 * queryDays)
periodInterval := -time.Duration(hours) * time.Hour
defer wg.Done()
periodCount, err = messageCountForInterval(tbl, ctx, prefix, periodInterval, keySegments)
if err != nil {
log.Fatalf("failed getting count for interval, err: %v", err)
}
}(prefix, keySegments)
// daily totals
var dailyTotals map[string]map[string]int
wg.Add(1)
go func(prefix string, keySegments int, queryDays int) {
var err error
defer wg.Done()
dailyTotals, err = createCountsOfInterval(tbl, ctx, prefix, queryDays, keySegments)
if err != nil {
log.Fatalf("failed getting createCountsOfInterval err %v", err)
}
}(prefix, keySegments, queryDays)
wg.Wait()
result := &totalsResult{
LastDayCount: last24HourCount,
TotalCount: periodCount,
DailyTotals: dailyTotals,
}
jsonBytes, err := json.Marshal(result)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Println(err.Error())
return
}
w.WriteHeader(http.StatusOK)
w.Write(jsonBytes)
}

View File

@ -0,0 +1,129 @@
// Package p contains an HTTP Cloud Function.
package p
import (
"context"
"encoding/json"
"fmt"
"html"
"io"
"log"
"net/http"
"os"
"cloud.google.com/go/bigtable"
)
// fetch a single row by transaction identifier
func Transaction(w http.ResponseWriter, r *http.Request) {
// Set CORS headers for the preflight request
if r.Method == http.MethodOptions {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "POST")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
w.Header().Set("Access-Control-Max-Age", "3600")
w.WriteHeader(http.StatusNoContent)
return
}
// Set CORS headers for the main request.
w.Header().Set("Access-Control-Allow-Origin", "*")
var transactionID string
// allow GET requests with querystring params, or POST requests with json body.
switch r.Method {
case http.MethodGet:
queryParams := r.URL.Query()
transactionID = queryParams.Get("id")
readyCheck := queryParams.Get("readyCheck")
if readyCheck != "" {
// for running in devnet
w.WriteHeader(http.StatusOK)
fmt.Fprint(w, html.EscapeString("ready"))
return
}
case http.MethodPost:
// declare request body properties
var d struct {
ID string `json:"id"`
}
// deserialize request body
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
switch err {
case io.EOF:
// do nothing, empty body is ok
default:
log.Printf("json.NewDecoder: %v", err)
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
return
}
}
transactionID = d.ID
default:
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
log.Println("Method Not Allowed")
return
}
if transactionID == "" {
fmt.Fprint(w, "id cannot be blank")
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
}
// create bibtable client and open table
clientOnce.Do(func() {
// Declare a separate err variable to avoid shadowing client.
var err error
project := os.Getenv("GCP_PROJECT")
instance := os.Getenv("BIGTABLE_INSTANCE")
client, err = bigtable.NewClient(context.Background(), project, instance)
if err != nil {
http.Error(w, "Error initializing client", http.StatusInternalServerError)
log.Printf("bigtable.NewClient: %v", err)
return
}
})
tbl := client.Open("v2Events")
var result bigtable.Row
readErr := tbl.ReadRows(r.Context(), bigtable.PrefixRange(""), func(row bigtable.Row) bool {
result = row
return true
}, bigtable.RowFilter(bigtable.ValueFilter(transactionID)))
if readErr != nil {
log.Fatalf("failed to read rows: %v", readErr)
}
if result == nil {
http.NotFound(w, r)
log.Printf("did not find row with transaction ID %v", transactionID)
return
}
key := result.Key()
row, err := tbl.ReadRow(r.Context(), key)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Fatalf("Could not read row with key %s: %v", key, err)
}
summary := makeSummary(row)
jsonBytes, err := json.Marshal(summary)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error()))
log.Println(err.Error())
return
}
w.WriteHeader(http.StatusOK)
w.Write(jsonBytes)
}

View File

@ -1,4 +1,4 @@
module github.com/certusone/wormhole/events_database
module github.com/certusone/wormhole/event_database
go 1.17

View File

@ -29,7 +29,8 @@ type bigTableWriter struct {
// rowKey returns a string with the input vales delimited by colons.
func makeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence uint64) string {
return fmt.Sprintf("%d:%s:%d", emitterChain, emitterAddress, sequence)
// left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically
return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence)
}
func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTableConnectionConfig) func(ctx context.Context) error {