From 4550dd179d71ac0849c4ebec6a63abbb72a34967 Mon Sep 17 00:00:00 2001 From: tbjump Date: Wed, 12 Jul 2023 02:27:45 +0000 Subject: [PATCH] node: increase buffer sizes --- node/pkg/node/node.go | 29 +++++++++++++++++++++-------- node/pkg/node/options.go | 4 ++-- 2 files changed, 23 insertions(+), 10 deletions(-) diff --git a/node/pkg/node/node.go b/node/pkg/node/node.go index b1a78407a..f0f1a7f40 100644 --- a/node/pkg/node/node.go +++ b/node/pkg/node/node.go @@ -19,12 +19,25 @@ import ( ) const ( - inboundObservationBufferSize = 5000 - inboundSignedVaaBufferSize = 50 - observationRequestOutboundBufferSize = 50 - observationRequestInboundBufferSize = 50 - // observationRequestBufferSize is the buffer size of the per-network reobservation channel - observationRequestBufferSize = 25 + // inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians. + // One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s + inboundObservationBufferSize = 10000 + + // inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians. + // One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't. + // So in the worst case the entire queue can be processed in 2s. + inboundSignedVaaBufferSize = 1000 + + // observationRequestInboundBufferSize configures the size of obsvReqC. + // Messages from there are immediately sent to the per-chain observation request channels, which are more important to configure. + observationRequestInboundBufferSize = 500 + + // observationRequestOutboundBufferSize configures the size of obsvReqSendC + // and thereby somewhat limits the amout of observation requests that can be sent in bursts to the network. + observationRequestOutboundBufferSize = 100 + + // observationRequestPerChainBufferSize is the buffer size of the per-network reobservation channel + observationRequestPerChainBufferSize = 100 ) type PrometheusCtxKey struct{} @@ -91,8 +104,8 @@ func (g *G) initializeBasic(logger *zap.Logger, rootCtxCancel context.CancelFunc g.msgC = makeChannelPair[*common.MessagePublication](0) g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup. g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize) - g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestOutboundBufferSize) - g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize) + g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize) + g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestOutboundBufferSize) g.acctC = makeChannelPair[*common.MessagePublication](accountant.MsgChannelCapacity) // Guardian set state managed by processor diff --git a/node/pkg/node/options.go b/node/pkg/node/options.go index 2ce0b3f68..7d9da09f5 100644 --- a/node/pkg/node/options.go +++ b/node/pkg/node/options.go @@ -316,7 +316,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC common.MustRegisterReadinessSyncing(wc.GetChainID()) } - chainObsvReqC[wc.GetChainID()] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + chainObsvReqC[wc.GetChainID()] = make(chan *gossipv1.ObservationRequest, observationRequestPerChainBufferSize) if wc.RequiredL1Finalizer() != "" { l1watcher, ok := watchers[wc.RequiredL1Finalizer()] @@ -352,7 +352,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC continue } - chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) + chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestPerChainBufferSize) common.MustRegisterReadinessSyncing(chainID) chainConfig = append(chainConfig, ibc.ChainConfigEntry{