node/proessor: Early return if no quorum (performance improvement)

This commit is contained in:
tbjump 2023-07-15 23:33:54 +00:00 committed by tbjump
parent 68f6cf8e6c
commit 59aa71d039
2 changed files with 61 additions and 36 deletions

View File

@ -1093,7 +1093,7 @@ func BenchmarkConsensus(b *testing.B) {
//CONSOLE_LOG_LEVEL = zap.DebugLevel
//CONSOLE_LOG_LEVEL = zap.InfoLevel
CONSOLE_LOG_LEVEL = zap.WarnLevel
runConsensusBenchmark(b, "1", 19, 1000, 50) // ~7.9s
runConsensusBenchmark(b, "1", 19, 1000, 50) // ~7.5s
//runConsensusBenchmark(b, "1", 19, 1000, 5) // ~10s
//runConsensusBenchmark(b, "1", 19, 1000, 1) // ~13s
}

View File

@ -45,6 +45,33 @@ var (
})
)
// signaturesToVaaFormat converts a map[common.Address][]byte (processor state format) to []*vaa.Signature (VAA format) given a set of keys gsKeys
// It also returns a bool array indicating which key in gsKeys had a signature
// The processor state format is used for effeciently storing signatures during aggregation while the VAA format is more efficient for on-chain verification.
func signaturesToVaaFormat(signatures map[common.Address][]byte, gsKeys []common.Address) ([]*vaa.Signature, []bool) {
// Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
agg := make([]bool, len(gsKeys))
var sigs []*vaa.Signature
for i, a := range gsKeys {
sig, ok := signatures[a]
if ok {
var bs [65]byte
if n := copy(bs[:], sig); n != 65 {
panic(fmt.Sprintf("invalid sig len: %d", n))
}
sigs = append(sigs, &vaa.Signature{
Index: uint8(i),
Signature: bs,
})
}
agg[i] = ok
}
return sigs, agg
}
// handleObservation processes a remote VAA observation, verifies it, checks whether the VAA has met quorum,
// and assembles and submits a valid VAA if possible.
func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgWithTimeStamp[gossipv1.SignedObservation]) {
@ -180,53 +207,51 @@ func (p *Processor) handleObservation(ctx context.Context, obs *node_common.MsgW
s.signatures[their_addr] = m.Signature
// Aggregate all valid signatures into a list of vaa.Signature and construct signed VAA.
agg := make([]bool, len(gs.Keys))
var sigs []*vaa.Signature
for i, a := range gs.Keys {
sig, ok := s.signatures[a]
if ok {
var bs [65]byte
if n := copy(bs[:], sig); n != 65 {
panic(fmt.Sprintf("invalid sig len: %d", n))
}
sigs = append(sigs, &vaa.Signature{
Index: uint8(i),
Signature: bs,
})
}
agg[i] = ok
}
if s.ourObservation != nil {
// We have made this observation on chain!
// 2/3+ majority required for VAA to be valid - wait until we have quorum to submit VAA.
quorum := vaa.CalculateQuorum(len(gs.Keys))
p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian
zap.String("digest", hash),
//zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Bools("aggregation", agg),
zap.Int("required_sigs", quorum),
zap.Int("have_sigs", len(sigs)),
zap.Bool("quorum", len(sigs) >= quorum),
)
// Check if we have more signatures than required for quorum.
// s.signatures may contain signatures from multiple guardian sets during guardian set updates
// Hence, if len(s.signatures) < quorum, then there is definitely no quorum and we can return early to save additional computation,
// but if len(s.signatures) >= quorum, there is not necessarily quorum for the active guardian set.
// We will later check for quorum again after assembling the VAA for a particular guardian set.
if len(s.signatures) < quorum {
// no quorum yet, we're done here
p.logger.Debug("quorum not yet met",
zap.String("digest", hash),
zap.String("messageId", m.MessageId),
)
return
}
if len(sigs) >= quorum && !s.submitted {
s.ourObservation.HandleQuorum(sigs, hash, p)
// Now we *may* have quorum, depending on the guardian set in use.
// Let's construct the VAA and check if we actually have quorum.
sigsVaaFormat, agg := signaturesToVaaFormat(s.signatures, gs.Keys)
if p.logger.Level().Enabled(zapcore.DebugLevel) {
p.logger.Debug("aggregation state for observation", // 1.3M out of 3M info messages / hour / guardian
zap.String("digest", hash),
zap.Any("set", gs.KeysAsHexStrings()),
zap.Uint32("index", gs.Index),
zap.Bools("aggregation", agg),
zap.Int("required_sigs", quorum),
zap.Int("have_sigs", len(sigsVaaFormat)),
zap.Bool("quorum", len(sigsVaaFormat) >= quorum),
)
}
if len(sigsVaaFormat) >= quorum && !s.submitted {
// we have reached quorum *with the active guardian set*
s.ourObservation.HandleQuorum(sigsVaaFormat, hash, p)
} else {
p.logger.Debug("quorum not met or already submitted, doing nothing", // 1.2M out of 3M info messages / hour / guardian
zap.String("digest", hash))
}
} else {
p.logger.Debug("we have not yet seen this observation - temporarily storing signature", // 175K out of 3M info messages / hour / guardian
zap.String("digest", hash),
zap.Bools("aggregation", agg))
zap.String("digest", hash))
}