Upsert observation directly to Mongo
Co-authored-by: walker-16 <agpazos85@gmail.com>
This commit is contained in:
parent
e06057acfd
commit
ac68907eb2
|
@ -113,22 +113,15 @@ func (c *observationGossipConsumer) process(ctx context.Context, o *gossipv1.Sig
|
||||||
go func(consumer *observationGossipConsumer, ctx context.Context, obs *gossipv1.SignedObservation) {
|
go func(consumer *observationGossipConsumer, ctx context.Context, obs *gossipv1.SignedObservation) {
|
||||||
err = consumer.txHashStore.SetObservation(ctx, obs)
|
err = consumer.txHashStore.SetObservation(ctx, obs)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
consumer.logger.Error("Error setting txHash", zap.String("id", o.MessageId), zap.Error(err))
|
consumer.logger.Error("Error setting txHash", zap.String("id", obs.MessageId), zap.Error(err))
|
||||||
}
|
|
||||||
}(c, ctx, o)
|
|
||||||
|
|
||||||
go func(consumer *observationGossipConsumer, ctx context.Context, obs *gossipv1.SignedObservation) {
|
|
||||||
err = c.observationProcess(ctx, o)
|
|
||||||
if err != nil {
|
|
||||||
c.logger.Error("Error processing observation", zap.String("id", o.MessageId), zap.Error(err))
|
|
||||||
// This is the fallback to store the observation in the repository.
|
|
||||||
err = consumer.repository.UpsertObservation(ctx, obs, false)
|
|
||||||
if err != nil {
|
|
||||||
consumer.logger.Error("Error inserting observation in repository", zap.String("id", o.MessageId), zap.Error(err))
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}(c, ctx, o)
|
}(c, ctx, o)
|
||||||
|
|
||||||
|
// This is the fallback to store the observation in the repository.
|
||||||
|
err = c.repository.UpsertObservation(ctx, o, false)
|
||||||
|
if err != nil {
|
||||||
|
c.logger.Error("Error inserting observation in repository", zap.String("id", o.MessageId), zap.Error(err))
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (c *observationGossipConsumer) verifyObservation(obs *gossipv1.SignedObservation) bool {
|
func (c *observationGossipConsumer) verifyObservation(obs *gossipv1.SignedObservation) bool {
|
||||||
|
@ -151,7 +144,7 @@ func (c *observationGossipConsumer) verifyObservation(obs *gossipv1.SignedObserv
|
||||||
|
|
||||||
_, isFromGuardian := c.gst.Get().KeyIndex(theirAddr)
|
_, isFromGuardian := c.gst.Get().KeyIndex(theirAddr)
|
||||||
if !isFromGuardian {
|
if !isFromGuardian {
|
||||||
c.logger.Error("error validating observation, signer not in guardian set",
|
c.logger.Debug("error validating observation, signer not in guardian set",
|
||||||
zap.String("id", obs.MessageId),
|
zap.String("id", obs.MessageId),
|
||||||
zap.String("obs_addr", theirAddr.Hex()),
|
zap.String("obs_addr", theirAddr.Hex()),
|
||||||
)
|
)
|
||||||
|
|
Loading…
Reference in New Issue