From 59aa71d039e2737760773265efadc1a14d80b39c Mon Sep 17 00:00:00 2001 From: tbjump Date: Sat, 15 Jul 2023 23:33:54 +0000 Subject: [PATCH] node/proessor: Early return if no quorum (performance improvement) --- node/pkg/node/node_test.go | 2 +- node/pkg/processor/observation.go | 95 +++++++++++++++++++------------ 2 files changed, 61 insertions(+), 36 deletions(-) diff --git a/node/pkg/node/node_test.go b/node/pkg/node/node_test.go index 79bdf7243..2cf2229ae 100644 --- a/node/pkg/node/node_test.go +++ b/node/pkg/node/node_test.go @@ -1093,7 +1093,7 @@ func BenchmarkConsensus(b *testing.B) { //CONSOLE_LOG_LEVEL = zap.DebugLevel //CONSOLE_LOG_LEVEL = zap.InfoLevel CONSOLE_LOG_LEVEL = zap.WarnLevel - runConsensusBenchmark(b, "1", 19, 1000, 50) // ~7.9s + runConsensusBenchmark(b, "1", 19, 1000, 50) // ~7.5s //runConsensusBenchmark(b, "1", 19, 1000, 5) // ~10s //runConsensusBenchmark(b, "1", 19, 1000, 1) // ~13s } diff --git a/node/pkg/processor/observation.go b/node/pkg/processor/observation.go index e855019ab..eddfa22d4 100644 --- a/node/pkg/processor/observation.go +++ b/node/pkg/processor/observation.go @@ -45,6 +45,33 @@ var ( }) ) +// signaturesToVaaFormat converts a map[common.Address][]byte (processor state format) to []*vaa.Signature (VAA format) given a set of keys gsKeys +// It also returns a bool array indicating which key in gsKeys had a signature +// The processor state format is used for effeciently storing signatures during aggregation while the VAA format is more efficient for on-chain verification. +func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) ([]*vaa.Signature, []bool) { + // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. + agg := make([]bool, len(gsKeys)) + var sigs []*vaa.Signature + for i, a := range gsKeys { + sig, ok := signatures[a] + + if ok { + var bs [65]byte + if n := copy(bs[:], sig); n != 65 { + panic(fmt.Sprintf("invalid sig len: %d", n)) + } + + sigs = append(sigs, &vaa.Signature{ + Index: uint8(i), + Signature: bs, + }) + } + + agg[i] = ok + } + return sigs, agg +} + // handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum, // and assembles and submits a valid VAA if possible. func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) { @@ -180,53 +207,51 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW s.signatures[their_addr] = m.Signature - // Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA. - agg := make([]bool, len(gs.Keys)) - var sigs []*vaa.Signature - for i, a := range gs.Keys { - sig, ok := s.signatures[a] - - if ok { - var bs [65]byte - if n := copy(bs[:], sig); n != 65 { - panic(fmt.Sprintf("invalid sig len: %d", n)) - } - - sigs = append(sigs, &vaa.Signature{ - Index: uint8(i), - Signature: bs, - }) - } - - agg[i] = ok - } - if s.ourObservation != nil { // We have made this observation on chain! - // 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA. quorum := vaa.CalculateQuorum(len(gs.Keys)) - p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian - zap.String("digest", hash), - //zap.Any("set", gs.KeysAsHexStrings()), - zap.Uint32("index", gs.Index), - zap.Bools("aggregation", agg), - zap.Int("required_sigs", quorum), - zap.Int("have_sigs", len(sigs)), - zap.Bool("quorum", len(sigs) >= quorum), - ) + // Check if we have more signatures than required for quorum. + // s.signatures may contain signatures from multiple guardian sets during guardian set updates + // Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation, + // but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set. + // We will later check for quorum again after assembling the VAA for a particular guardian set. + if len(s.signatures) < quorum { + // no quorum yet, we're done here + p.logger.Debug("quorum not yet met", + zap.String("digest", hash), + zap.String("messageId", m.MessageId), + ) + return + } - if len(sigs) >= quorum && !s.submitted { - s.ourObservation.HandleQuorum(sigs, hash, p) + // Now we *may* have quorum, depending on the guardian set in use. + // Let's construct the VAA and check if we actually have quorum. + sigsVaaFormat, agg := signaturesToVaaFormat(s.signatures, gs.Keys) + + if p.logger.Level().Enabled(zapcore.DebugLevel) { + p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian + zap.String("digest", hash), + zap.Any("set", gs.KeysAsHexStrings()), + zap.Uint32("index", gs.Index), + zap.Bools("aggregation", agg), + zap.Int("required_sigs", quorum), + zap.Int("have_sigs", len(sigsVaaFormat)), + zap.Bool("quorum", len(sigsVaaFormat) >= quorum), + ) + } + + if len(sigsVaaFormat) >= quorum && !s.submitted { + // we have reached quorum *with the active guardian set* + s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p) } else { p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian zap.String("digest", hash)) } } else { p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian - zap.String("digest", hash), - zap.Bools("aggregation", agg)) + zap.String("digest", hash)) }