BigTable: query optimizations and return payloads

Change-Id: If3a3c9a207518a26fbb8d924b5b9a9053c62c3a7

commit-id:00c2b83a
This commit is contained in:
justinschuldt 2021-11-16 12:45:15 -06:00 committed by Leopold Schabel
parent ba5d55c1b9
commit c4bced0e52
5 changed files with 217 additions and 27 deletions

View File

@ -45,6 +45,9 @@ var TokenTransferEmitters = map[string]string{
"000000000000000000000000784999135aaa8a3ca5914468852fdddbddd8789d": "terra10pyejy66429refv3g35g2t7am0was7ya7kz2a4", // terra "000000000000000000000000784999135aaa8a3ca5914468852fdddbddd8789d": "terra10pyejy66429refv3g35g2t7am0was7ya7kz2a4", // terra
} }
// this address is an emitter for BSC and Polygon.
var sharedEmitterAddress = "0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde"
type ( type (
TokenTransfer struct { TokenTransfer struct {
PayloadId uint8 PayloadId uint8
@ -204,6 +207,15 @@ func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, m
} }
return nil return nil
} }
func TrimUnicodeFromByteArray(b []byte) []byte {
// Escaped Unicode that has been observed in payload's token names and symbol:
null := "\u0000"
start := "\u0002"
ack := "\u0006"
tab := "\u0009"
control := "\u0012"
return bytes.Trim(b, null+start+ack+tab+control)
}
// ProcessVAA is triggered by a PubSub message, emitted after row is saved to BigTable by guardiand // ProcessVAA is triggered by a PubSub message, emitted after row is saved to BigTable by guardiand
func ProcessVAA(ctx context.Context, m PubSubMessage) error { func ProcessVAA(ctx context.Context, m PubSubMessage) error {
@ -223,7 +235,11 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
emitterHex := signedVaa.EmitterAddress.String() emitterHex := signedVaa.EmitterAddress.String()
payloadId := int(signedVaa.Payload[0]) payloadId := int(signedVaa.Payload[0])
if _, ok := TokenTransferEmitters[emitterHex]; ok { // BSC and Polygon have the same contract address: "0x5a58505a96d1dbf8df91cb21b54419fc36e93fde".
// The BSC contract is the NFT emitter address.
// The Polygon contract is the token transfer emitter address.
// Due to that, ensure that the block below only runs for token transfers by checking for chain == 4 and emitter addaress.
if _, ok := TokenTransferEmitters[emitterHex]; ok && !(signedVaa.EmitterChain == 4 && signedVaa.EmitterAddress.String() == sharedEmitterAddress) {
// figure out if it's a transfer or asset metadata // figure out if it's a transfer or asset metadata
if payloadId == 1 { if payloadId == 1 {
@ -239,7 +255,12 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
ts := bigtable.Now() ts := bigtable.Now()
mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId))) mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
// TODO: find a better way of representing amount as a string // TODO: find a better way of representing amount as a string
mutation.Set(colFam, "Amount", ts, []byte(fmt.Sprint(payload.Amount[3]))) amount := []byte(fmt.Sprint(payload.Amount[3]))
if payload.Amount[2] != 0 {
log.Printf("payload.Amount is larger than uint64 for row %v", rowKey)
amount = payload.Amount.Bytes()
}
mutation.Set(colFam, "Amount", ts, amount)
mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:]))) mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain))) mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:]))) mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
@ -270,8 +291,8 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
mutation.Set(colFam, "TokenAddress", ts, []byte(hex.EncodeToString(payload.TokenAddress[:]))) mutation.Set(colFam, "TokenAddress", ts, []byte(hex.EncodeToString(payload.TokenAddress[:])))
mutation.Set(colFam, "TokenChain", ts, []byte(fmt.Sprint(payload.TokenChain))) mutation.Set(colFam, "TokenChain", ts, []byte(fmt.Sprint(payload.TokenChain)))
mutation.Set(colFam, "Decimals", ts, []byte(fmt.Sprint(payload.Decimals))) mutation.Set(colFam, "Decimals", ts, []byte(fmt.Sprint(payload.Decimals)))
mutation.Set(colFam, "Name", ts, []byte(payload.Name[:])) mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:]))
mutation.Set(colFam, "Symbol", ts, []byte(payload.Symbol[:])) mutation.Set(colFam, "Symbol", ts, TrimUnicodeFromByteArray(payload.Symbol[:]))
writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation) writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation)
if writeErr != nil { if writeErr != nil {
log.Println("wrote TokenTransferPayload to bigtable!", rowKey) log.Println("wrote TokenTransferPayload to bigtable!", rowKey)
@ -298,11 +319,10 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error {
mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId))) mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId)))
mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:]))) mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:])))
mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain))) mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain)))
mutation.Set(colFam, "Symbol", ts, []byte(payload.Symbol[:])) mutation.Set(colFam, "Symbol", ts, TrimUnicodeFromByteArray(payload.Symbol[:]))
mutation.Set(colFam, "Name", ts, []byte(payload.Name[:])) mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:]))
// TODO: find a better way of representing tokenId as a string mutation.Set(colFam, "TokenId", ts, payload.TokenId.Bytes())
mutation.Set(colFam, "TokenId", ts, []byte(fmt.Sprint(payload.TokenId[3]))) mutation.Set(colFam, "URI", ts, TrimUnicodeFromByteArray(payload.URI))
mutation.Set(colFam, "URI", ts, []byte(payload.URI))
mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:]))) mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:])))
mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain))) mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain)))

View File

@ -9,6 +9,8 @@ import (
"log" "log"
"net/http" "net/http"
"strings" "strings"
"cloud.google.com/go/bigtable"
) )
// fetch a single row by the row key // fetch a single row by the row key
@ -105,7 +107,7 @@ func ReadRow(w http.ResponseWriter, r *http.Request) {
} }
rowKey = emitterChain + ":" + emitterAddress + ":" + sequence rowKey = emitterChain + ":" + emitterAddress + ":" + sequence
row, err := tbl.ReadRow(r.Context(), rowKey) row, err := tbl.ReadRow(r.Context(), rowKey, bigtable.RowFilter(bigtable.LatestNFilter(1)))
if err != nil { if err != nil {
http.Error(w, "Error reading rows", http.StatusInternalServerError) http.Error(w, "Error reading rows", http.StatusInternalServerError)
log.Printf("tbl.ReadRows(): %v", err) log.Printf("tbl.ReadRows(): %v", err)

View File

@ -73,8 +73,16 @@ var columnFamilies = []string{
"TokenTransferDetails", "TokenTransferDetails",
"ChainDetails", "ChainDetails",
} }
var messagePubFam = columnFamilies[0]
var quorumStateFam = columnFamilies[1]
var transferPayloadFam = columnFamilies[2]
var metaPayloadFam = columnFamilies[3]
var nftPayloadFam = columnFamilies[4]
var transferDetailsFam = columnFamilies[5]
var chainDetailsFam = columnFamilies[6]
type ( type (
// Summary is MessagePublication data & QuorumState data
Summary struct { Summary struct {
EmitterChain string EmitterChain string
EmitterAddress string EmitterAddress string
@ -84,16 +92,57 @@ type (
SignedVAABytes []byte SignedVAABytes []byte
QuorumTime string QuorumTime string
} }
// Details is a Summary, with the VAA decoded as SignedVAA // Details is a Summary extended with all the post-processing ColumnFamilies
Details struct { Details struct {
SignedVAA *vaa.VAA Summary
EmitterChain string SignedVAA *vaa.VAA
EmitterAddress string TokenTransferPayload *TokenTransferPayload
Sequence string AssetMetaPayload *AssetMetaPayload
InitiatingTxID string NFTTransferPayload *NFTTransferPayload
Payload []byte TransferDetails *TransferDetails
SignedVAABytes []byte ChainDetails *ChainDetails
QuorumTime string }
// The following structs match the ColumnFamiles they are named after
TokenTransferPayload struct {
Amount string
OriginAddress string
OriginChain string
TargetAddress string
TargetChain string
}
AssetMetaPayload struct {
TokenAddress string
TokenChain string
Decimals string
Symbol string
Name string
}
NFTTransferPayload struct {
OriginAddress string
OriginChain string
Symbol string
Name string
TokenId string
URI string
TargetAddress string
TargetChain string
}
TransferDetails struct {
Amount string
Decimals string
NotionalUSDStr string
TokenPriceUSDStr string
TransferTimestamp string
OriginSymbol string
OriginName string
OriginTokenAddress string
// fields below exist on the row, but no need to return them currently.
// NotionalUSD uint64
// TokenPriceUSD uint64
}
ChainDetails struct {
SenderAddress string
ReceiverAddress string
} }
) )
@ -115,9 +164,9 @@ func chainIdStringToType(chainId string) vaa.ChainID {
func makeSummary(row bigtable.Row) *Summary { func makeSummary(row bigtable.Row) *Summary {
summary := &Summary{} summary := &Summary{}
if _, ok := row[columnFamilies[0]]; ok { if _, ok := row[messagePubFam]; ok {
for _, item := range row[columnFamilies[0]] { for _, item := range row[messagePubFam] {
switch item.Column { switch item.Column {
case "MessagePublication:InitiatingTxID": case "MessagePublication:InitiatingTxID":
summary.InitiatingTxID = string(item.Value) summary.InitiatingTxID = string(item.Value)
@ -144,8 +193,8 @@ func makeSummary(row bigtable.Row) *Summary {
} }
summary.Sequence = seq summary.Sequence = seq
} }
if _, ok := row[columnFamilies[1]]; ok { if _, ok := row[quorumStateFam]; ok {
item := row[columnFamilies[1]][0] item := row[quorumStateFam][0]
summary.SignedVAABytes = item.Value summary.SignedVAABytes = item.Value
summary.QuorumTime = item.Timestamp.Time().String() summary.QuorumTime = item.Timestamp.Time().String()
} }
@ -153,8 +202,9 @@ func makeSummary(row bigtable.Row) *Summary {
} }
func makeDetails(row bigtable.Row) *Details { func makeDetails(row bigtable.Row) *Details {
deets := &Details{}
sum := makeSummary(row) sum := makeSummary(row)
deets := &Details{ deets.Summary = Summary{
EmitterChain: sum.EmitterChain, EmitterChain: sum.EmitterChain,
EmitterAddress: sum.EmitterAddress, EmitterAddress: sum.EmitterAddress,
Sequence: sum.Sequence, Sequence: sum.Sequence,
@ -163,10 +213,127 @@ func makeDetails(row bigtable.Row) *Details {
SignedVAABytes: sum.SignedVAABytes, SignedVAABytes: sum.SignedVAABytes,
QuorumTime: sum.QuorumTime, QuorumTime: sum.QuorumTime,
} }
if _, ok := row[columnFamilies[1]]; ok {
item := row[columnFamilies[1]][0] if _, ok := row[quorumStateFam]; ok {
item := row[quorumStateFam][0]
deets.SignedVAA, _ = vaa.Unmarshal(item.Value) deets.SignedVAA, _ = vaa.Unmarshal(item.Value)
} }
if _, ok := row[transferPayloadFam]; ok {
tokenTransferPayload := &TokenTransferPayload{}
for _, item := range row[transferPayloadFam] {
switch item.Column {
case "TokenTransferPayload:Amount":
tokenTransferPayload.Amount = string(item.Value)
case "TokenTransferPayload:OriginAddress":
tokenTransferPayload.OriginAddress = string(item.Value)
case "TokenTransferPayload:OriginChain":
tokenTransferPayload.OriginChain = string(item.Value)
case "TokenTransferPayload:TargetAddress":
tokenTransferPayload.TargetAddress = string(item.Value)
case "TokenTransferPayload:TargetChain":
tokenTransferPayload.TargetChain = string(item.Value)
}
}
deets.TokenTransferPayload = tokenTransferPayload
}
if _, ok := row[metaPayloadFam]; ok {
assetMetaPayload := &AssetMetaPayload{}
for _, item := range row[metaPayloadFam] {
switch item.Column {
case "AssetMetaPayload:TokenAddress":
assetMetaPayload.TokenAddress = string(item.Value)
case "AssetMetaPayload:TokenChain":
assetMetaPayload.TokenChain = string(item.Value)
case "AssetMetaPayload:Decimals":
assetMetaPayload.Decimals = string(item.Value)
case "AssetMetaPayload:Symbol":
assetMetaPayload.Symbol = string(item.Value)
case "AssetMetaPayload:Name":
assetMetaPayload.Name = string(item.Value)
}
}
deets.AssetMetaPayload = assetMetaPayload
}
if _, ok := row[nftPayloadFam]; ok {
nftTransferPayload := &NFTTransferPayload{}
for _, item := range row[nftPayloadFam] {
switch item.Column {
case "NFTTransferPayload:OriginAddress":
nftTransferPayload.OriginAddress = string(item.Value)
case "NFTTransferPayload:OriginChain":
nftTransferPayload.OriginChain = string(item.Value)
case "NFTTransferPayload:Symbol":
nftTransferPayload.Symbol = string(item.Value)
case "NFTTransferPayload:Name":
nftTransferPayload.Name = string(item.Value)
case "NFTTransferPayload:TokenId":
nftTransferPayload.TokenId = string(item.Value)
case "NFTTransferPayload:URI":
nftTransferPayload.URI = string(TrimUnicodeFromByteArray(item.Value))
case "NFTTransferPayload:TargetAddress":
nftTransferPayload.TargetAddress = string(item.Value)
case "NFTTransferPayload:TargetChain":
nftTransferPayload.TargetChain = string(item.Value)
}
}
deets.NFTTransferPayload = nftTransferPayload
}
if _, ok := row[transferDetailsFam]; ok {
transferDetails := &TransferDetails{}
for _, item := range row[transferDetailsFam] {
switch item.Column {
case "TokenTransferDetails:Amount":
transferDetails.Amount = string(item.Value)
case "TokenTransferDetails:Decimals":
transferDetails.Decimals = string(item.Value)
case "TokenTransferDetails:NotionalUSDStr":
transferDetails.NotionalUSDStr = string(item.Value)
case "TokenTransferDetails:TokenPriceUSDStr":
transferDetails.TokenPriceUSDStr = string(item.Value)
case "TokenTransferDetails:TransferTimestamp":
transferDetails.TransferTimestamp = string(item.Value)
case "TokenTransferDetails:OriginSymbol":
transferDetails.OriginSymbol = string(item.Value)
case "TokenTransferDetails:OriginName":
transferDetails.OriginName = string(item.Value)
case "TokenTransferDetails:OriginTokenAddress":
transferDetails.OriginTokenAddress = string(item.Value)
// NotionalUSD and TokenPriceUSD are more percise than the string versions returned,
// however the precision is not required, so leaving this commented out for now.
// case "TokenTransferDetails:NotionalUSD":
// reader := bytes.NewReader(item.Value)
// var notionalUSD uint64
// if err := binary.Read(reader, binary.BigEndian, &notionalUSD); err != nil {
// log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err)
// }
// transferDetails.NotionalUSD = notionalUSD
// case "TokenTransferDetails:TokenPriceUSD":
// reader := bytes.NewReader(item.Value)
// var tokenPriceUSD uint64
// if err := binary.Read(reader, binary.BigEndian, &tokenPriceUSD); err != nil {
// log.Fatalf("failed to read TokenPriceUSD of row: %v. err %v", row.Key(), err)
// }
// transferDetails.NotionalUSD = tokenPriceUSD
}
}
deets.TransferDetails = transferDetails
}
if _, ok := row[chainDetailsFam]; ok {
chainDetails := &ChainDetails{}
for _, item := range row[chainDetailsFam] {
switch item.Column {
case "ChainDetails:SenderAddress":
chainDetails.SenderAddress = string(item.Value)
case "ChainDetails:ReceiverAddress":
chainDetails.ReceiverAddress = string(item.Value)
}
}
deets.ChainDetails = chainDetails
}
return deets return deets
} }

View File

@ -54,6 +54,7 @@ func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string
}, bigtable.RowFilter( }, bigtable.RowFilter(
bigtable.ChainFilters( bigtable.ChainFilters(
// combine filters to get only what we need: // combine filters to get only what we need:
bigtable.FamilyFilter(columnFamilies[1]),
bigtable.CellsPerRowLimitFilter(1), // only the first cell in each column (helps for devnet where sequence resets) bigtable.CellsPerRowLimitFilter(1), // only the first cell in each column (helps for devnet where sequence resets)
bigtable.TimestampRangeFilter(start, end), // within time range bigtable.TimestampRangeFilter(start, end), // within time range
bigtable.StripValueFilter(), // no columns/values, just the row.Key() bigtable.StripValueFilter(), // no columns/values, just the row.Key()

View File

@ -92,7 +92,7 @@ func Transaction(w http.ResponseWriter, r *http.Request) {
} }
key := result.Key() key := result.Key()
row, err := tbl.ReadRow(r.Context(), key) row, err := tbl.ReadRow(r.Context(), key, bigtable.RowFilter(bigtable.LatestNFilter(1)))
if err != nil { if err != nil {
w.WriteHeader(http.StatusInternalServerError) w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte(err.Error())) w.Write([]byte(err.Error()))