From 5c8f1eb9a2dc533b94084a68301a40086451839b Mon Sep 17 00:00:00 2001 From: justinschuldt Date: Mon, 13 Dec 2021 12:33:23 -0600 Subject: [PATCH] BigTable: get solana address from token account commit-id:53bc09b1 --- event_database/cloud_functions/cmd/main.go | 24 +++----- .../cloud_functions/external-data.go | 57 +++++++++++++++++++ event_database/cloud_functions/process-vaa.go | 51 +++++++++++++---- 3 files changed, 104 insertions(+), 28 deletions(-) diff --git a/event_database/cloud_functions/cmd/main.go b/event_database/cloud_functions/cmd/main.go index f5800cc6..35aae77d 100644 --- a/event_database/cloud_functions/cmd/main.go +++ b/event_database/cloud_functions/cmd/main.go @@ -75,25 +75,17 @@ func main() { fmt.Println(fmt.Errorf("pubsub.NewClient err: %v", err)) } + pubsubTopicVAA := os.Getenv("PUBSUB_NEW_VAA_TOPIC") + pubsubSubscriptionVAA := os.Getenv("PUBSUB_NEW_VAA_SUBSCRIPTION") wg.Add(1) - go func() { - defer wg.Done() - - pubsubTopic := os.Getenv("PUBSUB_NEW_VAA_TOPIC") - pubsubSubscription := os.Getenv("PUBSUB_NEW_VAA_SUBSCRIPTION") - - createAndSubscribe(pubsubClient, pubsubTopic, pubsubSubscription, p.ProcessVAA) - }() + go createAndSubscribe(pubsubClient, pubsubTopicVAA, pubsubSubscriptionVAA, p.ProcessVAA) + wg.Done() + pubsubTopicTransfer := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC") + pubsubSubscriptionTransfer := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION") wg.Add(1) - go func() { - defer wg.Done() - - pubsubTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC") - pubsubSubscription := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION") - - createAndSubscribe(pubsubClient, pubsubTopic, pubsubSubscription, p.ProcessTransfer) - }() + go createAndSubscribe(pubsubClient, pubsubTopicTransfer, pubsubSubscriptionTransfer, p.ProcessTransfer) + wg.Done() wg.Wait() pubsubClient.Close() diff --git a/event_database/cloud_functions/external-data.go b/event_database/cloud_functions/external-data.go index 2974a8a2..a3346fd6 100644 --- a/event_database/cloud_functions/external-data.go +++ b/event_database/cloud_functions/external-data.go @@ -284,3 +284,60 @@ func fetchSolanaTokenList() map[string]SolanaToken { } return solTokens } + +const solanaBeachPublicBaseURL = "https://prod-api.solana.surf/v1/" +const solanaBeachPrivateBaseURL = "https://api.solanabeach.io/v1/" + +type SolanaBeachAccountOwner struct { + Owner SolanaBeachAccountOwnerAddress `json:"owner"` +} +type SolanaBeachAccountOwnerAddress struct { + Address string `json:"address"` +} +type SolanaBeachAccountResponse struct { + Value struct { + Extended struct { + SolanaBeachAccountOwner + } `json:"extended"` + } `json:"value"` +} + +func fetchSolanaAccountOwner(account string) string { + baseUrl := solanaBeachPublicBaseURL + + sbApiKey := os.Getenv("SOLANABEACH_API_KEY") + if sbApiKey != "" { + baseUrl = solanaBeachPrivateBaseURL + } + + url := fmt.Sprintf("%vaccount/%v", baseUrl, account) + req, reqErr := http.NewRequest("GET", url, nil) + if reqErr != nil { + log.Fatalf("failed solanabeach request, err: %v", reqErr) + } + + if sbApiKey != "" { + req.Header.Set("Authorization", fmt.Sprintf("Bearer %v", sbApiKey)) + } + + res, resErr := http.DefaultClient.Do(req) + if resErr != nil { + log.Fatalf("failed get solana beach account response, err: %v", resErr) + } + + defer res.Body.Close() + body, bodyErr := ioutil.ReadAll(res.Body) + if bodyErr != nil { + log.Fatalf("failed decoding solana beach account body, err: %v", bodyErr) + } + + var parsed SolanaBeachAccountResponse + + parseErr := json.Unmarshal(body, &parsed) + if parseErr != nil { + log.Printf("failed parsing body. err %v\n", parseErr) + } + address := parsed.Value.Extended.Owner.Address + log.Println("got owner address from Solana Beach! ", address) + return address +} diff --git a/event_database/cloud_functions/process-vaa.go b/event_database/cloud_functions/process-vaa.go index ed34b7a5..30d2d671 100644 --- a/event_database/cloud_functions/process-vaa.go +++ b/event_database/cloud_functions/process-vaa.go @@ -193,14 +193,16 @@ func makeRowKey(emitterChain vaa.ChainID, emitterAddress vaa.Address, sequence u // left-pad the sequence with zeros to 16 characters, because bigtable keys are stored lexicographically return fmt.Sprintf("%d:%s:%016d", emitterChain, emitterAddress, sequence) } -func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, mutation *bigtable.Mutation) error { +func writePayloadToBigTable(ctx context.Context, rowKey string, colFam string, mutation *bigtable.Mutation, forceWrite bool) error { + mut := mutation + if !forceWrite { + filter := bigtable.ChainFilters( + bigtable.FamilyFilter(colFam), + bigtable.ColumnFilter("PayloadId")) + mut = bigtable.NewCondMutation(filter, nil, mutation) + } - filter := bigtable.ChainFilters( - bigtable.FamilyFilter(colFam), - bigtable.ColumnFilter("PayloadId")) - conditionalMutation := bigtable.NewCondMutation(filter, nil, mutation) - - err := tbl.Apply(ctx, rowKey, conditionalMutation) + err := tbl.Apply(ctx, rowKey, mut) if err != nil { log.Printf("Failed to write payload for %v to BigTable. err: %v", rowKey, err) return err @@ -217,6 +219,19 @@ func TrimUnicodeFromByteArray(b []byte) []byte { return bytes.Trim(b, null+start+ack+tab+control) } +func addReceiverAddressToMutation(mut *bigtable.Mutation, ts bigtable.Timestamp, chainID uint16, hexAddress string) { + nativeAddress := transformHexAddressToNative(vaa.ChainID(chainID), hexAddress) + if vaa.ChainID(chainID) == vaa.ChainIDSolana { + ownerAddress := fetchSolanaAccountOwner(nativeAddress) + if ownerAddress == "" { + // exit with a failure code so the pubsub message is retried. + log.Fatalf("failed to find owner address for Solana account.") + } + nativeAddress = ownerAddress + } + mut.Set(columnFamilies[6], "ReceiverAddress", ts, []byte(nativeAddress)) +} + // ProcessVAA is triggered by a PubSub message, emitted after row is saved to BigTable by guardiand func ProcessVAA(ctx context.Context, m PubSubMessage) error { data := string(m.Data) @@ -249,6 +264,8 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { log.Println("failed decoding payload for row ", rowKey) return decodeErr } + log.Printf("Processing Transfer: Amount %v\n", fmt.Sprint(payload.Amount[3])) + // save payload to bigtable, then publish a new PubSub message for further processing colFam := columnFamilies[2] mutation := bigtable.NewMutation() @@ -260,13 +277,16 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { log.Printf("payload.Amount is larger than uint64 for row %v", rowKey) amount = payload.Amount.Bytes() } + targetAddressHex := hex.EncodeToString(payload.TargetAddress[:]) mutation.Set(colFam, "Amount", ts, amount) mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:]))) mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain))) - mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:]))) + mutation.Set(colFam, "TargetAddress", ts, []byte(targetAddressHex)) mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain))) - writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation) + addReceiverAddressToMutation(mutation, ts, payload.TargetChain, targetAddressHex) + + writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation, false) if writeErr != nil { return writeErr } @@ -299,6 +319,8 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { name = foundName } + log.Printf("Processing AssetMeta: Name %v, Symbol %v, coingeckoId %v\n", name, symbol, coinGeckoCoinId) + // save payload to bigtable colFam := columnFamilies[3] mutation := bigtable.NewMutation() @@ -313,7 +335,7 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { mutation.Set(colFam, "CoinGeckoCoinId", ts, []byte(coinGeckoCoinId)) mutation.Set(colFam, "NativeAddress", ts, []byte(nativeAddress)) - writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation) + writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation, false) return writeErr } else { // unknown payload type @@ -328,11 +350,14 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { log.Println("failed decoding payload for row ", rowKey) return decodeErr } + log.Printf("Processing NTF: Name %v, Symbol %v\n", string(TrimUnicodeFromByteArray(payload.Name[:])), string(TrimUnicodeFromByteArray(payload.Symbol[:]))) + // save payload to bigtable colFam := columnFamilies[4] mutation := bigtable.NewMutation() ts := bigtable.Now() + targetAddressHex := hex.EncodeToString(payload.TargetAddress[:]) mutation.Set(colFam, "PayloadId", ts, []byte(fmt.Sprint(payload.PayloadId))) mutation.Set(colFam, "OriginAddress", ts, []byte(hex.EncodeToString(payload.OriginAddress[:]))) mutation.Set(colFam, "OriginChain", ts, []byte(fmt.Sprint(payload.OriginChain))) @@ -340,10 +365,12 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { mutation.Set(colFam, "Name", ts, TrimUnicodeFromByteArray(payload.Name[:])) mutation.Set(colFam, "TokenId", ts, payload.TokenId.Bytes()) mutation.Set(colFam, "URI", ts, TrimUnicodeFromByteArray(payload.URI)) - mutation.Set(colFam, "TargetAddress", ts, []byte(hex.EncodeToString(payload.TargetAddress[:]))) + mutation.Set(colFam, "TargetAddress", ts, []byte(targetAddressHex)) mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain))) - writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation) + addReceiverAddressToMutation(mutation, ts, payload.TargetChain, targetAddressHex) + + writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation, false) return writeErr } else { // unknown payload type