improve logging around GCP PubSub message path

commit-id:04012992
This commit is contained in:
justinschuldt 2022-01-25 20:41:38 -06:00 committed by Leopold Schabel
parent 07c599ab68
commit 054d800f19
2 changed files with 19 additions and 9 deletions

View File

@ -89,7 +89,7 @@ func (re *AttestationEventReporter) ReportMessagePublication(msg *MessagePublica
case sub.MessagePublicationC <- msg:
re.logger.Debug("published MessagePublication to client", zap.Int("client", client))
default:
re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
re.logger.Error("buffer overrun when attempting to publish message", zap.Int("client", client))
}
}
}
@ -104,7 +104,7 @@ func (re *AttestationEventReporter) ReportVAAQuorum(msg *vaa.VAA) {
case sub.VAAQuorumC <- msg:
re.logger.Debug("published VAAQuorum to client", zap.Int("client", client))
default:
re.logger.Debug("buffer overrun when attempting to publish message", zap.Int("client", client))
re.logger.Error("buffer overrun when attempting to publish message", zap.Int("client", client))
}
}

View File

@ -57,9 +57,14 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable
pubsubClient, err := pubsub.NewClient(ctx, e.connectionConfig.GcpProjectID)
if err != nil {
return fmt.Errorf("failed to create PubSub client: %w", err)
logger.Error("failed to create GCP PubSub client", zap.Error(err))
return fmt.Errorf("failed to create GCP PubSub client: %w", err)
}
logger.Info("GCP PubSub.NewClient initialized")
pubsubTopic := pubsubClient.Topic(e.connectionConfig.TopicName)
logger.Info("GCP PubSub.Topic initialized",
zap.String("Topic", e.connectionConfig.TopicName))
// call to subscribe to event channels
sub := e.events.Subscribe()
logger.Info("subscribed to AttestationEvents")
@ -95,7 +100,7 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable
rowKey := MakeRowKey(msg.VAA.EmitterChain, msg.VAA.EmitterAddress, msg.VAA.Sequence)
err := tbl.Apply(ctx, rowKey, conditionalMutation)
if err != nil {
logger.Warn("Failed to write message publication to BigTable",
logger.Error("Failed to write message publication to BigTable",
zap.String("rowKey", rowKey),
zap.String("columnFamily", colFam),
zap.Error(err))
@ -105,7 +110,6 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable
colFam := "QuorumState"
mutation := bigtable.NewMutation()
ts := bigtable.Now()
// TODO - record signed VAAs from gossip messages.
b, marshalErr := msg.Marshal()
if marshalErr != nil {
@ -122,15 +126,21 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable
rowKey := MakeRowKey(msg.EmitterChain, msg.EmitterAddress, msg.Sequence)
err := tbl.Apply(ctx, rowKey, conditionalMutation)
if err != nil {
logger.Warn("Failed to write persistence info to BigTable",
logger.Error("Failed to write persistence info to BigTable",
zap.String("rowKey", rowKey),
zap.String("columnFamily", colFam),
zap.Error(err))
errC <- err
}
pubsubTopic.Publish(ctx, &pubsub.Message{
publishResult := pubsubTopic.Publish(ctx, &pubsub.Message{
Data: []byte(b),
})
_, err = publishResult.Get(ctx)
if err != nil {
logger.Error("Failed getting GCP PubSub publish reciept",
zap.String("rowKey", rowKey),
zap.Error(err))
}
}
}
}()
@ -142,7 +152,7 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable
logger.Error("Could not close BigTable client", zap.Error(err))
}
if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
logger.Error("Could not close PubSub client", zap.Error(pubsubErr))
logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
}
return ctx.Err()
case err := <-errC:
@ -155,7 +165,7 @@ func BigTableWriter(events *AttestationEventReporter, connectionConfig *BigTable
logger.Error("Could not close BigTable client", zap.Error(closeErr))
}
if pubsubErr := pubsubClient.Close(); pubsubErr != nil {
logger.Error("Could not close PubSub client", zap.Error(pubsubErr))
logger.Error("Could not close GCP PubSub client", zap.Error(pubsubErr))
}
return err