From c4bced0e52ce459bfb7361111b60bf564ddf9bf2 Mon Sep 17 00:00:00 2001 From: justinschuldt Date: Tue, 16 Nov 2021 12:45:15 -0600 Subject: [PATCH] BigTable: query optimizations and return payloads Change-Id: If3a3c9a207518a26fbb8d924b5b9a9053c62c3a7 commit-id:00c2b83a --- event_database/cloud_functions/process-vaa.go | 38 +++- event_database/cloud_functions/readrow.go | 4 +- event_database/cloud_functions/shared.go | 199 ++++++++++++++++-- event_database/cloud_functions/totals.go | 1 + event_database/cloud_functions/transaction.go | 2 +- 5 files changed, 217 insertions(+), 27 deletions(-) diff --git a/event_database/cloud_functions/process-vaa.go b/event_database/cloud_functions/process-vaa.go index 4fc5f4cf2..88401ceb6 100644 --- a/event_database/cloud_functions/process-vaa.go +++ b/event_database/cloud_functions/process-vaa.go @@ -45,6 +45,9 @@ var TokenTransferEmitters = map[string]string{ "000000000000000000000000784999135aaa8a3ca5914468852fdddbddd8789d": "terra10pyejy66429refv3g35g2t7am0was7ya7kz2a4", // terra } +// this address is an emitter for BSC and Polygon. +var sharedEmitterAddress = "0000000000000000000000005a58505a96d1dbf8df91cb21b54419fc36e93fde" + type ( TokenTransfer struct { PayloadId uint8 @@ -204,6 +207,15 @@ func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, m } return nil } +func TrimUnicodeFromByteArray(b []byte) []byte { + // Escaped Unicode that has been observed in payload's token names and symbol: + null := "\u0000" + start := "\u0002" + ack := "\u0006" + tab := "\u0009" + control := "\u0012" + return bytes.Trim(b, null+start+ack+tab+control) +} // ProcessVAA is triggered by a PubSub message, emitted after row is saved to BigTable by guardiand func ProcessVAA(ctx context.Context, m PubSubMessage) error { @@ -223,7 +235,11 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { emitterHex := signedVaa.EmitterAddress.String() payloadId := int(signedVaa.Payload[0]) - if _, ok := TokenTransferEmitters[emitterHex]; ok { + // BSC and Polygon have the same contract address: "0x5a58505a96d1dbf8df91cb21b54419fc36e93fde". + // The BSC contract is the NFT emitter address. + // The Polygon contract is the token transfer emitter address. + // Due to that, ensure that the block below only runs for token transfers by checking for chain == 4 and emitter addaress. + if _, ok := TokenTransferEmitters[emitterHex]; ok && !(signedVaa.EmitterChain == 4 && signedVaa.EmitterAddress.String() == sharedEmitterAddress) { // figure out if it's a transfer or asset metadata if payloadId == 1 { @@ -239,7 +255,12 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { ts := bigtable.Now() mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId))) // TODO: find a better way of representing amount as a string - mutation.Set(colFam, "Amount", ts, []byte(fmt.Sprint(payload.Amount[3]))) + amount := []byte(fmt.Sprint(payload.Amount[3])) + if payload.Amount[2] != 0 { + log.Printf("payload.Amount is larger than uint64 for row %v", rowKey) + amount = payload.Amount.Bytes() + } + mutation.Set(colFam, "Amount", ts, amount) mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:]))) mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain))) mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:]))) @@ -270,8 +291,8 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { mutation.Set(colFam, "TokenAddress", ts, []byte(hex.EncodeToString(payload.TokenAddress[:]))) mutation.Set(colFam, "TokenChain", ts, []byte(fmt.Sprint(payload.TokenChain))) mutation.Set(colFam, "Decimals", ts, []byte(fmt.Sprint(payload.Decimals))) - mutation.Set(colFam, "Name", ts, []byte(payload.Name[:])) - mutation.Set(colFam, "Symbol", ts, []byte(payload.Symbol[:])) + mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:])) + mutation.Set(colFam, "Symbol", ts, TrimUnicodeFromByteArray(payload.Symbol[:])) writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation) if writeErr != nil { log.Println("wrote TokenTransferPayload to bigtable!", rowKey) @@ -298,11 +319,10 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId))) mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:]))) mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain))) - mutation.Set(colFam, "Symbol", ts, []byte(payload.Symbol[:])) - mutation.Set(colFam, "Name", ts, []byte(payload.Name[:])) - // TODO: find a better way of representing tokenId as a string - mutation.Set(colFam, "TokenId", ts, []byte(fmt.Sprint(payload.TokenId[3]))) - mutation.Set(colFam, "URI", ts, []byte(payload.URI)) + mutation.Set(colFam, "Symbol", ts, TrimUnicodeFromByteArray(payload.Symbol[:])) + mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:])) + mutation.Set(colFam, "TokenId", ts, payload.TokenId.Bytes()) + mutation.Set(colFam, "URI", ts, TrimUnicodeFromByteArray(payload.URI)) mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:]))) mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain))) diff --git a/event_database/cloud_functions/readrow.go b/event_database/cloud_functions/readrow.go index 4acbf1493..64bb6ba11 100644 --- a/event_database/cloud_functions/readrow.go +++ b/event_database/cloud_functions/readrow.go @@ -9,6 +9,8 @@ import ( "log" "net/http" "strings" + + "cloud.google.com/go/bigtable" ) // fetch a single row by the row key @@ -105,7 +107,7 @@ func ReadRow(w http.ResponseWriter, r *http.Request) { } rowKey = emitterChain + ":" + emitterAddress + ":" + sequence - row, err := tbl.ReadRow(r.Context(), rowKey) + row, err := tbl.ReadRow(r.Context(), rowKey, bigtable.RowFilter(bigtable.LatestNFilter(1))) if err != nil { http.Error(w, "Error reading rows", http.StatusInternalServerError) log.Printf("tbl.ReadRows(): %v", err) diff --git a/event_database/cloud_functions/shared.go b/event_database/cloud_functions/shared.go index a210e5d05..420740061 100644 --- a/event_database/cloud_functions/shared.go +++ b/event_database/cloud_functions/shared.go @@ -73,8 +73,16 @@ var columnFamilies = []string{ "TokenTransferDetails", "ChainDetails", } +var messagePubFam = columnFamilies[0] +var quorumStateFam = columnFamilies[1] +var transferPayloadFam = columnFamilies[2] +var metaPayloadFam = columnFamilies[3] +var nftPayloadFam = columnFamilies[4] +var transferDetailsFam = columnFamilies[5] +var chainDetailsFam = columnFamilies[6] type ( + // Summary is MessagePublication data & QuorumState data Summary struct { EmitterChain string EmitterAddress string @@ -84,16 +92,57 @@ type ( SignedVAABytes []byte QuorumTime string } - // Details is a Summary, with the VAA decoded as SignedVAA + // Details is a Summary extended with all the post-processing ColumnFamilies Details struct { - SignedVAA *vaa.VAA - EmitterChain string - EmitterAddress string - Sequence string - InitiatingTxID string - Payload []byte - SignedVAABytes []byte - QuorumTime string + Summary + SignedVAA *vaa.VAA + TokenTransferPayload *TokenTransferPayload + AssetMetaPayload *AssetMetaPayload + NFTTransferPayload *NFTTransferPayload + TransferDetails *TransferDetails + ChainDetails *ChainDetails + } + // The following structs match the ColumnFamiles they are named after + TokenTransferPayload struct { + Amount string + OriginAddress string + OriginChain string + TargetAddress string + TargetChain string + } + AssetMetaPayload struct { + TokenAddress string + TokenChain string + Decimals string + Symbol string + Name string + } + NFTTransferPayload struct { + OriginAddress string + OriginChain string + Symbol string + Name string + TokenId string + URI string + TargetAddress string + TargetChain string + } + TransferDetails struct { + Amount string + Decimals string + NotionalUSDStr string + TokenPriceUSDStr string + TransferTimestamp string + OriginSymbol string + OriginName string + OriginTokenAddress string + // fields below exist on the row, but no need to return them currently. + // NotionalUSD uint64 + // TokenPriceUSD uint64 + } + ChainDetails struct { + SenderAddress string + ReceiverAddress string } ) @@ -115,9 +164,9 @@ func chainIdStringToType(chainId string) vaa.ChainID { func makeSummary(row bigtable.Row) *Summary { summary := &Summary{} - if _, ok := row[columnFamilies[0]]; ok { + if _, ok := row[messagePubFam]; ok { - for _, item := range row[columnFamilies[0]] { + for _, item := range row[messagePubFam] { switch item.Column { case "MessagePublication:InitiatingTxID": summary.InitiatingTxID = string(item.Value) @@ -144,8 +193,8 @@ func makeSummary(row bigtable.Row) *Summary { } summary.Sequence = seq } - if _, ok := row[columnFamilies[1]]; ok { - item := row[columnFamilies[1]][0] + if _, ok := row[quorumStateFam]; ok { + item := row[quorumStateFam][0] summary.SignedVAABytes = item.Value summary.QuorumTime = item.Timestamp.Time().String() } @@ -153,8 +202,9 @@ func makeSummary(row bigtable.Row) *Summary { } func makeDetails(row bigtable.Row) *Details { + deets := &Details{} sum := makeSummary(row) - deets := &Details{ + deets.Summary = Summary{ EmitterChain: sum.EmitterChain, EmitterAddress: sum.EmitterAddress, Sequence: sum.Sequence, @@ -163,10 +213,127 @@ func makeDetails(row bigtable.Row) *Details { SignedVAABytes: sum.SignedVAABytes, QuorumTime: sum.QuorumTime, } - if _, ok := row[columnFamilies[1]]; ok { - item := row[columnFamilies[1]][0] + + if _, ok := row[quorumStateFam]; ok { + item := row[quorumStateFam][0] deets.SignedVAA, _ = vaa.Unmarshal(item.Value) } + if _, ok := row[transferPayloadFam]; ok { + tokenTransferPayload := &TokenTransferPayload{} + for _, item := range row[transferPayloadFam] { + switch item.Column { + case "TokenTransferPayload:Amount": + tokenTransferPayload.Amount = string(item.Value) + case "TokenTransferPayload:OriginAddress": + tokenTransferPayload.OriginAddress = string(item.Value) + case "TokenTransferPayload:OriginChain": + tokenTransferPayload.OriginChain = string(item.Value) + case "TokenTransferPayload:TargetAddress": + tokenTransferPayload.TargetAddress = string(item.Value) + case "TokenTransferPayload:TargetChain": + tokenTransferPayload.TargetChain = string(item.Value) + } + } + deets.TokenTransferPayload = tokenTransferPayload + } + if _, ok := row[metaPayloadFam]; ok { + assetMetaPayload := &AssetMetaPayload{} + for _, item := range row[metaPayloadFam] { + switch item.Column { + case "AssetMetaPayload:TokenAddress": + assetMetaPayload.TokenAddress = string(item.Value) + case "AssetMetaPayload:TokenChain": + assetMetaPayload.TokenChain = string(item.Value) + case "AssetMetaPayload:Decimals": + assetMetaPayload.Decimals = string(item.Value) + case "AssetMetaPayload:Symbol": + assetMetaPayload.Symbol = string(item.Value) + case "AssetMetaPayload:Name": + assetMetaPayload.Name = string(item.Value) + } + } + deets.AssetMetaPayload = assetMetaPayload + } + if _, ok := row[nftPayloadFam]; ok { + nftTransferPayload := &NFTTransferPayload{} + for _, item := range row[nftPayloadFam] { + switch item.Column { + case "NFTTransferPayload:OriginAddress": + nftTransferPayload.OriginAddress = string(item.Value) + case "NFTTransferPayload:OriginChain": + nftTransferPayload.OriginChain = string(item.Value) + case "NFTTransferPayload:Symbol": + nftTransferPayload.Symbol = string(item.Value) + case "NFTTransferPayload:Name": + nftTransferPayload.Name = string(item.Value) + case "NFTTransferPayload:TokenId": + nftTransferPayload.TokenId = string(item.Value) + case "NFTTransferPayload:URI": + nftTransferPayload.URI = string(TrimUnicodeFromByteArray(item.Value)) + case "NFTTransferPayload:TargetAddress": + nftTransferPayload.TargetAddress = string(item.Value) + case "NFTTransferPayload:TargetChain": + nftTransferPayload.TargetChain = string(item.Value) + } + } + deets.NFTTransferPayload = nftTransferPayload + } + if _, ok := row[transferDetailsFam]; ok { + transferDetails := &TransferDetails{} + for _, item := range row[transferDetailsFam] { + switch item.Column { + case "TokenTransferDetails:Amount": + transferDetails.Amount = string(item.Value) + case "TokenTransferDetails:Decimals": + transferDetails.Decimals = string(item.Value) + case "TokenTransferDetails:NotionalUSDStr": + transferDetails.NotionalUSDStr = string(item.Value) + case "TokenTransferDetails:TokenPriceUSDStr": + transferDetails.TokenPriceUSDStr = string(item.Value) + case "TokenTransferDetails:TransferTimestamp": + transferDetails.TransferTimestamp = string(item.Value) + case "TokenTransferDetails:OriginSymbol": + transferDetails.OriginSymbol = string(item.Value) + case "TokenTransferDetails:OriginName": + transferDetails.OriginName = string(item.Value) + case "TokenTransferDetails:OriginTokenAddress": + transferDetails.OriginTokenAddress = string(item.Value) + + // NotionalUSD and TokenPriceUSD are more percise than the string versions returned, + // however the precision is not required, so leaving this commented out for now. + + // case "TokenTransferDetails:NotionalUSD": + // reader := bytes.NewReader(item.Value) + // var notionalUSD uint64 + // if err := binary.Read(reader, binary.BigEndian, ¬ionalUSD); err != nil { + // log.Fatalf("failed to read NotionalUSD of row: %v. err %v ", row.Key(), err) + // } + // transferDetails.NotionalUSD = notionalUSD + + // case "TokenTransferDetails:TokenPriceUSD": + // reader := bytes.NewReader(item.Value) + // var tokenPriceUSD uint64 + // if err := binary.Read(reader, binary.BigEndian, &tokenPriceUSD); err != nil { + // log.Fatalf("failed to read TokenPriceUSD of row: %v. err %v", row.Key(), err) + // } + // transferDetails.NotionalUSD = tokenPriceUSD + + } + } + deets.TransferDetails = transferDetails + } + if _, ok := row[chainDetailsFam]; ok { + chainDetails := &ChainDetails{} + for _, item := range row[chainDetailsFam] { + switch item.Column { + case "ChainDetails:SenderAddress": + chainDetails.SenderAddress = string(item.Value) + case "ChainDetails:ReceiverAddress": + chainDetails.ReceiverAddress = string(item.Value) + } + } + deets.ChainDetails = chainDetails + } return deets } diff --git a/event_database/cloud_functions/totals.go b/event_database/cloud_functions/totals.go index 4268836b4..b7bad510b 100644 --- a/event_database/cloud_functions/totals.go +++ b/event_database/cloud_functions/totals.go @@ -54,6 +54,7 @@ func fetchRowsInInterval(tbl *bigtable.Table, ctx context.Context, prefix string }, bigtable.RowFilter( bigtable.ChainFilters( // combine filters to get only what we need: + bigtable.FamilyFilter(columnFamilies[1]), bigtable.CellsPerRowLimitFilter(1), // only the first cell in each column (helps for devnet where sequence resets) bigtable.TimestampRangeFilter(start, end), // within time range bigtable.StripValueFilter(), // no columns/values, just the row.Key() diff --git a/event_database/cloud_functions/transaction.go b/event_database/cloud_functions/transaction.go index 96e7368f1..657f68dff 100644 --- a/event_database/cloud_functions/transaction.go +++ b/event_database/cloud_functions/transaction.go @@ -92,7 +92,7 @@ func Transaction(w http.ResponseWriter, r *http.Request) { } key := result.Key() - row, err := tbl.ReadRow(r.Context(), key) + row, err := tbl.ReadRow(r.Context(), key, bigtable.RowFilter(bigtable.LatestNFilter(1))) if err != nil { w.WriteHeader(http.StatusInternalServerError) w.Write([]byte(err.Error()))