Cloud function for BigTable queries
Change-Id: I8a2c0e349bdebeeeaf8b79f02285615b3a147884
This commit is contained in:
parent
37b654e7f9
commit
44fe7bd726
|
@ -26,7 +26,7 @@ go run . \
|
|||
-project wormhole-315720 \
|
||||
-instance wormhole-dev \
|
||||
-keyFilePath ./bigtable-admin.json \
|
||||
-queryRowKey ethereum:0xf15fe0e6dedef169a25696c577bfddf3d35707210ac641108646d7cfe507f174
|
||||
-queryRowKey 2:000000000000000000000000e982e462b094850f12af94d21d470e21be9d0e9:6
|
||||
```
|
||||
|
||||
Lookup all rows with a timestamp >= `queryPreviousMinutes` ago:
|
||||
|
|
|
@ -0,0 +1,35 @@
|
|||
## Google Cloud function for reading BigTable
|
||||
|
||||
This is a reference implementaion for getting data out of BigTable.
|
||||
|
||||
### invocation
|
||||
|
||||
both methods read the same data. just two different ways of querying:
|
||||
|
||||
GET
|
||||
|
||||
```bash
|
||||
curl "https://region-project-id.cloudfunctions.net/your-function-name?emitterChain=2&emitterAddress=000000000000000000000000e982e462b094850f12af94d21d470e21be9d0e9c&sequence=6"
|
||||
```
|
||||
|
||||
POST
|
||||
|
||||
```bash
|
||||
curl -X POST https://region-project-id.cloudfunctions.net/your-function-name \
|
||||
-H "Content-Type:application/json" \
|
||||
-d \
|
||||
'{"emitterChain":"2", "emitterAddress":"000000000000000000000000e982e462b094850f12af94d21d470e21be9d0e9c", "sequence":"6"}' | jq '.'
|
||||
|
||||
{
|
||||
"Message": {
|
||||
"InitiatingTxID": "0x47727f32a3c6033044fd9f11778c6b5691262533607a654fd020c068e5d12fba",
|
||||
"Payload": "AQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAAApD7FnIIr0VbsTd4AWO3t6mhDBYAAjsehQZiDTcv6/TspR/9xdoL+60kUe5xBKmz74vCab+YAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=="
|
||||
},
|
||||
"GuardianAddresses": [
|
||||
"0xbeFA429d57cD18b7F8A4d91A2da9AB4AF05d0FBe"
|
||||
],
|
||||
"SignedVAA": "AQAAAAABAIOSpeda6nEXWxJoS/d59cniULw0+DDSOVBxxOZPltunSM0BHgoJh6Srbg8Fa4eqLlifpCibLJx9MbJSwbXerZkAAAE4yuBzAQAAAgAAAAAAAAAAAAAAAOmC5GKwlIUPEq+U0h1HDiG+nQ6cAAAAAAAAAAYPAQAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAF9eEAAAAAAAAAAAAAAAAAApD7FnIIr0VbsTd4AWO3t6mhDBYAAjsehQZiDTcv6/TspR/9xdoL+60kUe5xBKmz74vCab+YAAEAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA==",
|
||||
"QuorumTime": "2021-08-11 00:16:11.757 +0000 UTC"
|
||||
}
|
||||
|
||||
```
|
|
@ -0,0 +1,168 @@
|
|||
// Package p contains an HTTP Cloud Function.
|
||||
package p
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"log"
|
||||
"net/http"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"cloud.google.com/go/bigtable"
|
||||
)
|
||||
|
||||
// client is a global Bigtable client, to avoid initializing a new client for
|
||||
// every request.
|
||||
var client *bigtable.Client
|
||||
var clientOnce sync.Once
|
||||
|
||||
var columnFamilies = []string{"MessagePublication", "Signatures", "VAAState", "QuorumState"}
|
||||
|
||||
type (
|
||||
MessagePub struct {
|
||||
InitiatingTxID string
|
||||
Payload []byte
|
||||
}
|
||||
Summary struct {
|
||||
Message MessagePub
|
||||
GuardianAddresses []string
|
||||
SignedVAA []byte
|
||||
QuorumTime string
|
||||
}
|
||||
)
|
||||
|
||||
func makeSummary(row bigtable.Row) *Summary {
|
||||
summary := &Summary{}
|
||||
if _, ok := row[columnFamilies[0]]; ok {
|
||||
|
||||
message := &MessagePub{}
|
||||
for _, item := range row[columnFamilies[0]] {
|
||||
switch item.Column {
|
||||
case "MessagePublication:InitiatingTxID":
|
||||
message.InitiatingTxID = string(item.Value)
|
||||
case "MessagePublication:Payload":
|
||||
message.Payload = item.Value
|
||||
}
|
||||
}
|
||||
summary.Message = *message
|
||||
}
|
||||
if _, ok := row[columnFamilies[1]]; ok {
|
||||
for _, item := range row[columnFamilies[1]] {
|
||||
column := strings.Split(item.Column, ":")
|
||||
summary.GuardianAddresses = append(summary.GuardianAddresses, column[1])
|
||||
}
|
||||
}
|
||||
if _, ok := row[columnFamilies[3]]; ok {
|
||||
|
||||
for _, item := range row[columnFamilies[3]] {
|
||||
if item.Column == "QuorumState:SignedVAA" {
|
||||
summary.SignedVAA = item.Value
|
||||
summary.QuorumTime = item.Timestamp.Time().String()
|
||||
}
|
||||
}
|
||||
}
|
||||
return summary
|
||||
}
|
||||
|
||||
func ReadRow(w http.ResponseWriter, r *http.Request) {
|
||||
// Set CORS headers for the preflight request
|
||||
if r.Method == http.MethodOptions {
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
w.Header().Set("Access-Control-Allow-Methods", "POST")
|
||||
w.Header().Set("Access-Control-Allow-Headers", "Content-Type")
|
||||
w.Header().Set("Access-Control-Max-Age", "3600")
|
||||
w.WriteHeader(http.StatusNoContent)
|
||||
return
|
||||
}
|
||||
// Set CORS headers for the main request.
|
||||
w.Header().Set("Access-Control-Allow-Origin", "*")
|
||||
|
||||
var rowKey string
|
||||
|
||||
// allow GET requests with querystring params, or POST requests with json body.
|
||||
switch r.Method {
|
||||
case http.MethodGet:
|
||||
queryParams := r.URL.Query()
|
||||
emitterChain := queryParams.Get("emitterChain")
|
||||
emitterAddress := queryParams.Get("emitterAddress")
|
||||
sequence := queryParams.Get("sequence")
|
||||
|
||||
// check for empty values
|
||||
if emitterChain == "" || emitterAddress == "" || sequence == "" {
|
||||
fmt.Fprint(w, "body values cannot be empty")
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
rowKey = emitterChain + ":" + emitterAddress + ":" + sequence
|
||||
case http.MethodPost:
|
||||
// declare request body properties
|
||||
var d struct {
|
||||
EmitterChain string `json:"emitterChain"`
|
||||
EmitterAddress string `json:"emitterAddress"`
|
||||
Sequence string `json:"sequence"`
|
||||
}
|
||||
|
||||
// deserialize request body
|
||||
if err := json.NewDecoder(r.Body).Decode(&d); err != nil {
|
||||
switch err {
|
||||
case io.EOF:
|
||||
fmt.Fprint(w, "request body required")
|
||||
return
|
||||
default:
|
||||
log.Printf("json.NewDecoder: %v", err)
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// check for empty values
|
||||
if d.EmitterChain == "" || d.EmitterAddress == "" || d.Sequence == "" {
|
||||
fmt.Fprint(w, "body values cannot be empty")
|
||||
http.Error(w, http.StatusText(http.StatusBadRequest), http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
rowKey = d.EmitterChain + ":" + d.EmitterAddress + ":" + d.Sequence
|
||||
default:
|
||||
http.Error(w, "405 - Method Not Allowed", http.StatusMethodNotAllowed)
|
||||
log.Println("Method Not Allowed")
|
||||
return
|
||||
}
|
||||
|
||||
clientOnce.Do(func() {
|
||||
// Declare a separate err variable to avoid shadowing client.
|
||||
var err error
|
||||
client, err = bigtable.NewClient(context.Background(), "wormhole-315720", "wormhole-dev")
|
||||
if err != nil {
|
||||
http.Error(w, "Error initializing client", http.StatusInternalServerError)
|
||||
log.Printf("bigtable.NewClient: %v", err)
|
||||
return
|
||||
}
|
||||
})
|
||||
|
||||
tbl := client.Open("v2Events")
|
||||
row, err := tbl.ReadRow(r.Context(), rowKey)
|
||||
if err != nil {
|
||||
http.Error(w, "Error reading rows", http.StatusInternalServerError)
|
||||
log.Printf("tbl.ReadRows(): %v", err)
|
||||
return
|
||||
}
|
||||
if row == nil {
|
||||
http.NotFound(w, r)
|
||||
log.Printf("did not find row for key %v", rowKey)
|
||||
return
|
||||
}
|
||||
|
||||
summary := makeSummary(row)
|
||||
jsonBytes, err := json.Marshal(summary)
|
||||
if err != nil {
|
||||
w.WriteHeader(http.StatusInternalServerError)
|
||||
w.Write([]byte(err.Error()))
|
||||
log.Println(err.Error())
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
w.Write(jsonBytes)
|
||||
}
|
|
@ -0,0 +1,8 @@
|
|||
module example.com/cloudfunction
|
||||
|
||||
// cloud runtime is go 1.13. just for reference.
|
||||
|
||||
require (
|
||||
cloud.google.com/go/bigtable v1.10.1
|
||||
google.golang.org/api v0.48.0
|
||||
)
|
Loading…
Reference in New Issue