Send re-observation request when re-broadcasting local observations

Currently if an observation hasn't reached quorum within 5 minutes, the
processor will re-broadcast the signed local observation to the other
guardians in the network. However if not enough guardians actually
observed the original tx, then no amount of re-broadcasting will help
the network reach quorum.

Fix this issue by sending a re-observation request whenever we
re-broadcast a signed local observation.  This ensures that any
guardians that missed the tx the first time it happened have a chance to
re-observe it and help the network reach quorum.
This commit is contained in:
Chirantan Ekbote 2022-08-17 15:10:32 +09:00 committed by Chirantan Ekbote
parent 4712a6f774
commit 1753bb34f0
4 changed files with 22 additions and 3 deletions

View File

@ -1102,6 +1102,7 @@ func runNode(cmd *cobra.Command, args []string) {
setC,
sendC,
obsvC,
obsvReqSendC,
injectC,
signedInC,
gk,

View File

@ -59,6 +59,7 @@ func (p *Processor) broadcastSignature(
p.state.signatures[hash].ourObservation = o
p.state.signatures[hash].ourMsg = msg
p.state.signatures[hash].txHash = txhash
p.state.signatures[hash].source = o.GetEmitterChain().String()
p.state.signatures[hash].gs = p.gs // guaranteed to match ourObservation - there's no concurrent access to p.gs

View File

@ -8,6 +8,7 @@ import (
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/notify/discord"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/vaa"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -177,14 +178,22 @@ func (p *Processor) handleCleanup(ctx context.Context) {
case !s.submitted && delta.Minutes() >= 5:
// Poor observation has been unsubmitted for five minutes - clearly, something went wrong.
// If we have previously submitted an observation, we can make another attempt to get it over
// the finish line by rebroadcasting our sig. If we do not have a observation, it means we either never observed it,
// or it got revived by a malfunctioning guardian node, in which case, we can't do anything
// about it and just delete it to keep our state nice and lean.
// the finish line by sending a re-observation request to the network and rebroadcasting our
// sig. If we do not have an observation, it means we either never observed it, or it got
// revived by a malfunctioning guardian node, in which case, we can't do anything about it
// and just delete it to keep our state nice and lean.
if s.ourMsg != nil {
p.logger.Info("resubmitting observation",
zap.String("digest", hash),
zap.Duration("delta", delta),
zap.Uint("retry", s.retryCount))
req := &gossipv1.ObservationRequest{
ChainId: uint32(s.ourObservation.GetEmitterChain()),
TxHash: s.txHash,
}
if err := common.PostObservationRequest(p.obsvReqSendC, req); err != nil {
p.logger.Warn("failed to broadcast re-observation request", zap.Error(err))
}
p.sendC <- s.ourMsg
s.retryCount += 1
aggregationStateRetries.Inc()

View File

@ -55,6 +55,8 @@ type (
retryCount uint
// Copy of the bytes we submitted (ourObservation, but signed and serialized). Used for retransmissions.
ourMsg []byte
// The hash of the transaction in which the observation was made. Used for re-observation requests.
txHash []byte
// Copy of the guardian set valid at observation/injection time.
gs *common.GuardianSet
}
@ -77,6 +79,10 @@ type Processor struct {
sendC chan []byte
// obsvC is a channel of inbound decoded observations from p2p
obsvC chan *gossipv1.SignedObservation
// obsvReqSendC is a send-only channel of outbound re-observation requests to broadcast on p2p
obsvReqSendC chan<- *gossipv1.ObservationRequest
// signedInC is a channel of inbound signed VAA observations from p2p
signedInC chan *gossipv1.SignedVAAWithQuorum
@ -123,6 +129,7 @@ func NewProcessor(
setC chan *common.GuardianSet,
sendC chan []byte,
obsvC chan *gossipv1.SignedObservation,
obsvReqSendC chan<- *gossipv1.ObservationRequest,
injectC chan *vaa.VAA,
signedInC chan *gossipv1.SignedVAAWithQuorum,
gk *ecdsa.PrivateKey,
@ -140,6 +147,7 @@ func NewProcessor(
setC: setC,
sendC: sendC,
obsvC: obsvC,
obsvReqSendC: obsvReqSendC,
signedInC: signedInC,
injectC: injectC,
gk: gk,