diff --git a/devnet/bigtable.yaml b/devnet/bigtable.yaml index 36ab15a6d..402d66129 100644 --- a/devnet/bigtable.yaml +++ b/devnet/bigtable.yaml @@ -158,10 +158,14 @@ spec: value: local-dev - name: BIGTABLE_INSTANCE value: wormhole - - name: PUBSUB_TOPIC + - name: PUBSUB_NEW_VAA_TOPIC value: new-vaa-devnet - - name: PUBSUB_SUBSCRIPTION + - name: PUBSUB_NEW_VAA_SUBSCRIPTION value: extract-payload-devnet + - name: PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC + value: create-token-transfer-details-devnet + - name: PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION + value: calculate-transfer-data-devnet ports: - containerPort: 8080 name: functions diff --git a/event_database/cloud_functions/.vscode/launch.json b/event_database/cloud_functions/.vscode/launch.json index 9246d282a..c120434cf 100644 --- a/event_database/cloud_functions/.vscode/launch.json +++ b/event_database/cloud_functions/.vscode/launch.json @@ -11,9 +11,14 @@ "mode": "auto", "program": "${workspaceFolder}/cmd/main.go", "env": { - "GCP_PROJECT": "wormhole-315720", - "BIGTABLE_INSTANCE": "wormhole-mainnet", - "GOOGLE_APPLICATION_CREDENTIALS": "./bigtable-admin.json" + "GCP_PROJECT": "local-dev", + "BIGTABLE_INSTANCE": "wormhole", + "BIGTABLE_EMULATOR_HOST": "localhost:8086", + "PUBSUB_EMULATOR_HOST": "localhost:8085", + "PUBSUB_NEW_VAA_TOPIC": "new-vaa-devnet", + "PUBSUB_NEW_VAA_SUBSCRIPTION": "extract-payload-devnet", + "PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC": "create-token-transfer-details-devnet", + "PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION": "calculate-transfer-data-devnet", }, }, ] diff --git a/event_database/cloud_functions/cmd/main.go b/event_database/cloud_functions/cmd/main.go index ffbab0bb0..f5800cc6e 100644 --- a/event_database/cloud_functions/cmd/main.go +++ b/event_database/cloud_functions/cmd/main.go @@ -12,6 +12,39 @@ import ( p "github.com/certusone/wormhole/event_database/cloud_functions" ) +func createAndSubscribe(client *pubsub.Client, topicName, subscriptionName string, handler func(ctx context.Context, m p.PubSubMessage) error) { + var topic *pubsub.Topic + var topicErr error + ctx := context.Background() + topic, topicErr = client.CreateTopic(ctx, topicName) + if topicErr != nil { + log.Printf("pubsub.CreateTopic err: %v", topicErr) + // already exists + topic = client.Topic(topicName) + } else { + log.Println("created topic:", topicName) + } + + subConf := pubsub.SubscriptionConfig{Topic: topic} + _, subErr := client.CreateSubscription(ctx, subscriptionName, subConf) + if subErr != nil { + log.Printf("pubsub.CreateSubscription err: %v", subErr) + } else { + log.Println("created subscription:", subscriptionName) + } + + sub := client.Subscription(subscriptionName) + + err := sub.Receive(ctx, func(ctx context.Context, msg *pubsub.Message) { + msg.Ack() + handler(ctx, p.PubSubMessage{Data: msg.Data}) + + }) + if err != nil { + fmt.Println(fmt.Errorf("receive err: %v", err)) + } +} + func main() { var wg sync.WaitGroup @@ -34,51 +67,34 @@ func main() { }() // pubsub functions + pubsubCtx := context.Background() + gcpProject := os.Getenv("GCP_PROJECT") + + pubsubClient, err := pubsub.NewClient(pubsubCtx, gcpProject) + if err != nil { + fmt.Println(fmt.Errorf("pubsub.NewClient err: %v", err)) + } + wg.Add(1) go func() { defer wg.Done() - pubsubCtx := context.Background() - gcpProject := os.Getenv("GCP_PROJECT") - client, err := pubsub.NewClient(pubsubCtx, gcpProject) - if err != nil { - fmt.Println(fmt.Errorf("pubsub.NewClient err: %v", err)) - } - defer client.Close() + pubsubTopic := os.Getenv("PUBSUB_NEW_VAA_TOPIC") + pubsubSubscription := os.Getenv("PUBSUB_NEW_VAA_SUBSCRIPTION") - pubsubTopic := os.Getenv("PUBSUB_TOPIC") - pubsubSubscription := os.Getenv("PUBSUB_SUBSCRIPTION") - var topic *pubsub.Topic - var topicErr error + createAndSubscribe(pubsubClient, pubsubTopic, pubsubSubscription, p.ProcessVAA) + }() - topic, topicErr = client.CreateTopic(pubsubCtx, pubsubTopic) - if topicErr != nil { - log.Printf("pubsub.CreateTopic err: %v", topicErr) - // already exists - topic = client.Topic(pubsubTopic) - } else { - log.Println("created topic:", pubsubTopic) - } + wg.Add(1) + go func() { + defer wg.Done() - subConf := pubsub.SubscriptionConfig{Topic: topic} - _, subErr := client.CreateSubscription(pubsubCtx, pubsubSubscription, subConf) - if subErr != nil { - log.Printf("pubsub.CreateSubscription err: %v", subErr) - } else { - log.Println("created subscription:", pubsubSubscription) - } + pubsubTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC") + pubsubSubscription := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_SUBSCRIPTION") - sub := client.Subscription(pubsubSubscription) - - err = sub.Receive(pubsubCtx, func(ctx context.Context, msg *pubsub.Message) { - msg.Ack() - p.ProcessVAA(ctx, p.PubSubMessage{Data: msg.Data}) - - }) - if err != nil { - fmt.Println(fmt.Errorf("receive err: %v", err)) - } + createAndSubscribe(pubsubClient, pubsubTopic, pubsubSubscription, p.ProcessTransfer) }() wg.Wait() + pubsubClient.Close() } diff --git a/event_database/cloud_functions/process-transfer.go b/event_database/cloud_functions/process-transfer.go new file mode 100644 index 000000000..df712f110 --- /dev/null +++ b/event_database/cloud_functions/process-transfer.go @@ -0,0 +1,29 @@ +package p + +import ( + "context" + "encoding/json" + "fmt" + "log" + + "github.com/certusone/wormhole/node/pkg/vaa" +) + +// ProcessTransfer is triggered by a PubSub message, once a TokenTransferPayload is written to a row. +func ProcessTransfer(ctx context.Context, m PubSubMessage) error { + data := string(m.Data) + if data == "" { + return fmt.Errorf("no data to process in message") + } + + log.Printf("ProcessTransfer got message!") + signedVaa, err := vaa.Unmarshal(m.Data) + if err != nil { + log.Println("failed Unmarshaling VAA") + return err + } + jsonVaa, _ := json.MarshalIndent(signedVaa, "", " ") + log.Printf("ProcessTransfer Unmarshaled VAA: %q\n", string(jsonVaa)) + + return nil +} diff --git a/event_database/cloud_functions/process-vaa.go b/event_database/cloud_functions/process-vaa.go index e31308297..4fc5f4cf2 100644 --- a/event_database/cloud_functions/process-vaa.go +++ b/event_database/cloud_functions/process-vaa.go @@ -9,6 +9,7 @@ import ( "log" "cloud.google.com/go/bigtable" + "cloud.google.com/go/pubsub" "github.com/certusone/wormhole/node/pkg/vaa" "github.com/holiman/uint256" ) @@ -232,7 +233,7 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { log.Println("failed decoding payload for row ", rowKey) return decodeErr } - // save payload to bigtable + // save payload to bigtable, then publish a new PubSub message for further processing colFam := columnFamilies[2] mutation := bigtable.NewMutation() ts := bigtable.Now() @@ -245,7 +246,13 @@ func ProcessVAA(ctx context.Context, m PubSubMessage) error { mutation.Set(colFam, "TargetChain", ts, []byte(fmt.Sprint(payload.TargetChain))) writeErr := writePayloadToBigTable(ctx, rowKey, colFam, mutation) - return writeErr + if writeErr != nil { + return writeErr + } + + // now that the payload is saved to BigTable, + // pass along the message to the topic that will calculate TokenTransferDetails + pubSubTokenTransferDetailsTopic.Publish(ctx, &pubsub.Message{Data: m.Data}) } else if payloadId == 2 { // asset meta payload, decodeErr := DecodeAssetMeta(signedVaa.Payload) diff --git a/event_database/cloud_functions/shared.go b/event_database/cloud_functions/shared.go index bb2703700..4efdd3074 100644 --- a/event_database/cloud_functions/shared.go +++ b/event_database/cloud_functions/shared.go @@ -9,6 +9,7 @@ import ( "sync" "cloud.google.com/go/bigtable" + "cloud.google.com/go/pubsub" "github.com/certusone/wormhole/node/pkg/vaa" ) @@ -20,6 +21,9 @@ var client *bigtable.Client var clientOnce sync.Once var tbl *bigtable.Table +var pubsubClient *pubsub.Client +var pubSubTokenTransferDetailsTopic *pubsub.Topic + // init runs during cloud function initialization. So, this will only run during an // an instance's cold start. // https://cloud.google.com/functions/docs/bestpractices/networking#accessing_google_apis @@ -36,8 +40,19 @@ func init() { return } + + var pubsubErr error + pubsubClient, pubsubErr = pubsub.NewClient(context.Background(), project) + if pubsubErr != nil { + log.Printf("pubsub.NewClient error: %v", pubsubErr) + return + } }) tbl = client.Open("v2Events") + + // create the topic that will be published to after decoding token transfer payloads + tokenTransferDetailsTopic := os.Getenv("PUBSUB_TOKEN_TRANSFER_DETAILS_TOPIC") + pubSubTokenTransferDetailsTopic = pubsubClient.Topic(tokenTransferDetailsTopic) } var columnFamilies = []string{