node/processor: improve handleObservation performance

This commit is contained in:
tbjump 2023-07-12 02:07:14 +00:00 committed by tbjump
parent 5ca5b53cd8
commit 68f6cf8e6c
2 changed files with 39 additions and 32 deletions

View File

@ -1093,7 +1093,7 @@ func BenchmarkConsensus(b *testing.B) {
//CONSOLE_LOG_LEVEL = zap.DebugLevel //CONSOLE_LOG_LEVEL = zap.DebugLevel
//CONSOLE_LOG_LEVEL = zap.InfoLevel //CONSOLE_LOG_LEVEL = zap.InfoLevel
CONSOLE_LOG_LEVEL = zap.WarnLevel CONSOLE_LOG_LEVEL = zap.WarnLevel
runConsensusBenchmark(b, "1", 19, 1000, 10) // ~10s runConsensusBenchmark(b, "1", 19, 1000, 50) // ~7.9s
//runConsensusBenchmark(b, "1", 19, 1000, 5) // ~10s //runConsensusBenchmark(b, "1", 19, 1000, 5) // ~10s
//runConsensusBenchmark(b, "1", 19, 1000, 1) // ~13s //runConsensusBenchmark(b, "1", 19, 1000, 1) // ~13s
} }

View File

@ -55,15 +55,22 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
m := obs.Msg m := obs.Msg
hash := hex.EncodeToString(m.Hash) hash := hex.EncodeToString(m.Hash)
s := p.state.signatures[hash]
if s != nil && s.settled {
// already settled; ignoring additional signatures for it.
return
}
p.logger.Debug("received observation", if p.logger.Core().Enabled(zapcore.DebugLevel) {
zap.String("digest", hash), p.logger.Debug("received observation",
zap.String("signature", hex.EncodeToString(m.Signature)), zap.String("digest", hash),
zap.String("addr", hex.EncodeToString(m.Addr)), zap.String("signature", hex.EncodeToString(m.Signature)),
zap.String("txhash", hex.EncodeToString(m.TxHash)), zap.String("addr", hex.EncodeToString(m.Addr)),
zap.String("txhash_b58", base58.Encode(m.TxHash)), zap.String("txhash", hex.EncodeToString(m.TxHash)),
zap.String("message_id", m.MessageId), zap.String("txhash_b58", base58.Encode(m.TxHash)),
) zap.String("message_id", m.MessageId),
)
}
observationsReceivedTotal.Inc() observationsReceivedTotal.Inc()
@ -111,8 +118,8 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
// During an update, vaaState.signatures can contain signatures from *both* guardian sets. // During an update, vaaState.signatures can contain signatures from *both* guardian sets.
// //
var gs *node_common.GuardianSet var gs *node_common.GuardianSet
if p.state.signatures[hash] != nil && p.state.signatures[hash].gs != nil { if s != nil && s.gs != nil {
gs = p.state.signatures[hash].gs gs = s.gs
} else { } else {
gs = p.gs gs = p.gs
} }
@ -136,7 +143,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.String("digest", hash), zap.String("digest", hash),
zap.String("their_addr", their_addr.Hex()), zap.String("their_addr", their_addr.Hex()),
zap.Uint32("index", gs.Index), zap.Uint32("index", gs.Index),
zap.Any("keys", gs.KeysAsHexStrings()), //zap.Any("keys", gs.KeysAsHexStrings()),
) )
observationsFailedTotal.WithLabelValues("unknown_guardian").Inc() observationsFailedTotal.WithLabelValues("unknown_guardian").Inc()
return return
@ -150,7 +157,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc() observationsReceivedByGuardianAddressTotal.WithLabelValues(their_addr.Hex()).Inc()
// []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging. // []byte isn't hashable in a map. Paying a small extra cost for encoding for easier debugging.
if p.state.signatures[hash] == nil { if s == nil {
// We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like. // We haven't yet seen this event ourselves, and therefore do not know what the VAA looks like.
// However, we have established that a valid guardian has signed it, and therefore we can // However, we have established that a valid guardian has signed it, and therefore we can
// already start aggregating signatures for it. // already start aggregating signatures for it.
@ -161,25 +168,27 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
observationsUnknownTotal.Inc() observationsUnknownTotal.Inc()
p.state.signatures[hash] = &state{ s = &state{
firstObserved: time.Now(), firstObserved: time.Now(),
nextRetry: time.Now().Add(nextRetryDuration(0)), nextRetry: time.Now().Add(nextRetryDuration(0)),
signatures: map[common.Address][]byte{}, signatures: map[common.Address][]byte{},
source: "unknown", source: "unknown",
} }
p.state.signatures[hash] = s
} }
p.state.signatures[hash].signatures[their_addr] = m.Signature s.signatures[their_addr] = m.Signature
// Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
agg := make([]bool, len(gs.Keys)) agg := make([]bool, len(gs.Keys))
var sigs []*vaa.Signature var sigs []*vaa.Signature
for i, a := range gs.Keys { for i, a := range gs.Keys {
s, ok := p.state.signatures[hash].signatures[a] sig, ok := s.signatures[a]
if ok { if ok {
var bs [65]byte var bs [65]byte
if n := copy(bs[:], s); n != 65 { if n := copy(bs[:], sig); n != 65 {
panic(fmt.Sprintf("invalid sig len: %d", n)) panic(fmt.Sprintf("invalid sig len: %d", n))
} }
@ -192,7 +201,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
agg[i] = ok agg[i] = ok
} }
if p.state.signatures[hash].ourObservation != nil { if s.ourObservation != nil {
// We have made this observation on chain! // We have made this observation on chain!
// 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
@ -200,7 +209,7 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian
zap.String("digest", hash), zap.String("digest", hash),
zap.Any("set", gs.KeysAsHexStrings()), //zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index), zap.Uint32("index", gs.Index),
zap.Bools("aggregation", agg), zap.Bools("aggregation", agg),
zap.Int("required_sigs", quorum), zap.Int("required_sigs", quorum),
@ -208,8 +217,8 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
zap.Bool("quorum", len(sigs) >= quorum), zap.Bool("quorum", len(sigs) >= quorum),
) )
if len(sigs) >= quorum && !p.state.signatures[hash].submitted { if len(sigs) >= quorum && !s.submitted {
p.state.signatures[hash].ourObservation.HandleQuorum(sigs, hash, p) s.ourObservation.HandleQuorum(sigs, hash, p)
} else { } else {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian
zap.String("digest", hash)) zap.String("digest", hash))
@ -242,13 +251,9 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
return return
} }
// Calculate digest for logging
digest := v.SigningDigest()
hash := hex.EncodeToString(digest.Bytes())
if p.gs == nil { if p.gs == nil {
p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet", p.logger.Warn("dropping SignedVAAWithQuorum message since we haven't initialized our guardian set yet",
zap.String("digest", hash), zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("message", m), zap.Any("message", m),
) )
return return
@ -257,7 +262,7 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
// Check if guardianSet doesn't have any keys // Check if guardianSet doesn't have any keys
if len(p.gs.Keys) == 0 { if len(p.gs.Keys) == 0 {
p.logger.Warn("dropping SignedVAAWithQuorum message since we have a guardian set without keys", p.logger.Warn("dropping SignedVAAWithQuorum message since we have a guardian set without keys",
zap.String("digest", hash), zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.Any("message", m), zap.Any("message", m),
) )
return return
@ -274,11 +279,13 @@ func (p *Processor) handleInboundSignedVAAWithQuorum(ctx context.Context, m *gos
// - enough signatures are present for the VAA to reach quorum // - enough signatures are present for the VAA to reach quorum
// Store signed VAA in database. // Store signed VAA in database.
p.logger.Debug("storing inbound signed VAA with quorum", if p.logger.Level().Enabled(zapcore.DebugLevel) {
zap.String("digest", hash), p.logger.Debug("storing inbound signed VAA with quorum",
zap.Any("vaa", v), zap.String("digest", hex.EncodeToString(v.SigningDigest().Bytes())),
zap.String("bytes", hex.EncodeToString(m.Vaa)), zap.Any("vaa", v),
zap.String("message_id", v.MessageID())) zap.String("bytes", hex.EncodeToString(m.Vaa)),
zap.String("message_id", v.MessageID()))
}
if err := p.storeSignedVAA(v); err != nil { if err := p.storeSignedVAA(v); err != nil {
p.logger.Error("failed to store signed VAA", zap.Error(err)) p.logger.Error("failed to store signed VAA", zap.Error(err))