From 054d800f19bd438f683da2ba87b45bf71e1c9677 Mon Sep 17 00:00:00 2001 From: justinschuldt Date: Tue, 25 Jan 2022 20:41:38 -0600 Subject: [PATCH] improve logging around GCP PubSub message path commit-id:04012992 --- node/pkg/reporter/attestation_events.go | 4 ++-- node/pkg/reporter/bigtablewriter.go | 24 +++++++++++++++++------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/node/pkg/reporter/attestation_events.go b/node/pkg/reporter/attestation_events.go index 9ca711ac..866db564 100644 --- a/node/pkg/reporter/attestation_events.go +++ b/node/pkg/reporter/attestation_events.go @@ -89,7 +89,7 @@ func (re *AttestationEventReporter) ReportMessagePublication(msg *MessagePublica case sub.MessagePublicationC <- msg: re.logger.Debug("published MessagePublication to client", zap.Int("client", client)) default: - re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client)) + re.logger.Error("buffer overrun when attempting to publish message", zap.Int("client", client)) } } } @@ -104,7 +104,7 @@ func (re *AttestationEventReporter) ReportVAAQuorum(msg *vaa.VAA) { case sub.VAAQuorumC <- msg: re.logger.Debug("published VAAQuorum to client", zap.Int("client", client)) default: - re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client)) + re.logger.Error("buffer overrun when attempting to publish message", zap.Int("client", client)) } } diff --git a/node/pkg/reporter/bigtablewriter.go b/node/pkg/reporter/bigtablewriter.go index b553f66a..7d52004c 100644 --- a/node/pkg/reporter/bigtablewriter.go +++ b/node/pkg/reporter/bigtablewriter.go @@ -57,9 +57,14 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable pubsubClient, err := pubsub.NewClient(ctx, e.connectionConfig.GcpProjectID) if err != nil { - return fmt.Errorf("failed to create PubSub client: %w", err) + logger.Error("failed to create GCP PubSub client", zap.Error(err)) + return fmt.Errorf("failed to create GCP PubSub client: %w", err) } + logger.Info("GCP PubSub.NewClient initialized") + pubsubTopic := pubsubClient.Topic(e.connectionConfig.TopicName) + logger.Info("GCP PubSub.Topic initialized", + zap.String("Topic", e.connectionConfig.TopicName)) // call to subscribe to event channels sub := e.events.Subscribe() logger.Info("subscribed to AttestationEvents") @@ -95,7 +100,7 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable rowKey := MakeRowKey(msg.VAA.EmitterChain, msg.VAA.EmitterAddress, msg.VAA.Sequence) err := tbl.Apply(ctx, rowKey, conditionalMutation) if err != nil { - logger.Warn("Failed to write message publication to BigTable", + logger.Error("Failed to write message publication to BigTable", zap.String("rowKey", rowKey), zap.String("columnFamily", colFam), zap.Error(err)) @@ -105,7 +110,6 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable colFam := "QuorumState" mutation := bigtable.NewMutation() ts := bigtable.Now() - // TODO - record signed VAAs from gossip messages. b, marshalErr := msg.Marshal() if marshalErr != nil { @@ -122,15 +126,21 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable rowKey := MakeRowKey(msg.EmitterChain, msg.EmitterAddress, msg.Sequence) err := tbl.Apply(ctx, rowKey, conditionalMutation) if err != nil { - logger.Warn("Failed to write persistence info to BigTable", + logger.Error("Failed to write persistence info to BigTable", zap.String("rowKey", rowKey), zap.String("columnFamily", colFam), zap.Error(err)) errC <- err } - pubsubTopic.Publish(ctx, &pubsub.Message{ + publishResult := pubsubTopic.Publish(ctx, &pubsub.Message{ Data: []byte(b), }) + _, err = publishResult.Get(ctx) + if err != nil { + logger.Error("Failed getting GCP PubSub publish reciept", + zap.String("rowKey", rowKey), + zap.Error(err)) + } } } }() @@ -142,7 +152,7 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable logger.Error("Could not close BigTable client", zap.Error(err)) } if pubsubErr := pubsubClient.Close(); pubsubErr != nil { - logger.Error("Could not close PubSub client", zap.Error(pubsubErr)) + logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr)) } return ctx.Err() case err := <-errC: @@ -155,7 +165,7 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable logger.Error("Could not close BigTable client", zap.Error(closeErr)) } if pubsubErr := pubsubClient.Close(); pubsubErr != nil { - logger.Error("Could not close PubSub client", zap.Error(pubsubErr)) + logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr)) } return err