node: increase buffer sizes

This commit is contained in:
tbjump 2023-07-12 02:27:45 +00:00 committed by tbjump
parent 84d4a834a8
commit 4550dd179d
2 changed files with 23 additions and 10 deletions

View File

@ -19,12 +19,25 @@ import (
) )
const ( const (
inboundObservationBufferSize = 5000 // inboundObservationBufferSize configures the size of the obsvC channel that contains observations from other Guardians.
inboundSignedVaaBufferSize = 50 // One observation takes roughly 0.1ms to process on one core, so the whole queue could be processed in 1s
observationRequestOutboundBufferSize = 50 inboundObservationBufferSize = 10000
observationRequestInboundBufferSize = 50
// observationRequestBufferSize is the buffer size of the per-network reobservation channel // inboundSignedVaaBufferSize configures the size of the signedInC channel that contains VAAs from other Guardians.
observationRequestBufferSize = 25 // One VAA takes roughly 0.01ms to process if we already have one in the database and 2ms if we don't.
// So in the worst case the entire queue can be processed in 2s.
inboundSignedVaaBufferSize = 1000
// observationRequestInboundBufferSize configures the size of obsvReqC.
// Messages from there are immediately sent to the per-chain observation request channels, which are more important to configure.
observationRequestInboundBufferSize = 500
// observationRequestOutboundBufferSize configures the size of obsvReqSendC
// and thereby somewhat limits the amout of observation requests that can be sent in bursts to the network.
observationRequestOutboundBufferSize = 100
// observationRequestPerChainBufferSize is the buffer size of the per-network reobservation channel
observationRequestPerChainBufferSize = 100
) )
type PrometheusCtxKey struct{} type PrometheusCtxKey struct{}
@ -91,8 +104,8 @@ func (g *G) initializeBasic(logger *zap.Logger, rootCtxCancel context.CancelFunc
g.msgC = makeChannelPair[*common.MessagePublication](0) g.msgC = makeChannelPair[*common.MessagePublication](0)
g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup. g.setC = makeChannelPair[*common.GuardianSet](1) // This needs to be a buffered channel because of a circular dependency between processor and accountant during startup.
g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize) g.signedInC = makeChannelPair[*gossipv1.SignedVAAWithQuorum](inboundSignedVaaBufferSize)
g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestOutboundBufferSize) g.obsvReqC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize)
g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestInboundBufferSize) g.obsvReqSendC = makeChannelPair[*gossipv1.ObservationRequest](observationRequestOutboundBufferSize)
g.acctC = makeChannelPair[*common.MessagePublication](accountant.MsgChannelCapacity) g.acctC = makeChannelPair[*common.MessagePublication](accountant.MsgChannelCapacity)
// Guardian set state managed by processor // Guardian set state managed by processor

View File

@ -316,7 +316,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
common.MustRegisterReadinessSyncing(wc.GetChainID()) common.MustRegisterReadinessSyncing(wc.GetChainID())
} }
chainObsvReqC[wc.GetChainID()] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) chainObsvReqC[wc.GetChainID()] = make(chan *gossipv1.ObservationRequest, observationRequestPerChainBufferSize)
if wc.RequiredL1Finalizer() != "" { if wc.RequiredL1Finalizer() != "" {
l1watcher, ok := watchers[wc.RequiredL1Finalizer()] l1watcher, ok := watchers[wc.RequiredL1Finalizer()]
@ -352,7 +352,7 @@ func GuardianOptionWatchers(watcherConfigs []watchers.WatcherConfig, ibcWatcherC
continue continue
} }
chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestBufferSize) chainObsvReqC[chainID] = make(chan *gossipv1.ObservationRequest, observationRequestPerChainBufferSize)
common.MustRegisterReadinessSyncing(chainID) common.MustRegisterReadinessSyncing(chainID)
chainConfig = append(chainConfig, ibc.ChainConfigEntry{ chainConfig = append(chainConfig, ibc.ChainConfigEntry{