Don't allow obs reqs if the queue is full (#1429)

Change-Id: Ifb0d038fa3adeddc6226e2289fe9dbfc8e39b4e7
This commit is contained in:
bruce-riley 2022-08-15 07:33:45 -05:00 committed by GitHub
parent 3f51c89f30
commit 99fb46d549
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 68 additions and 3 deletions

View File

@ -396,7 +396,10 @@ func adminServiceRunnable(logger *zap.Logger, socketPath string, injectC chan<-
} }
func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req *nodev1.SendObservationRequestRequest) (*nodev1.SendObservationRequestResponse, error) { func (s *nodePrivilegedService) SendObservationRequest(ctx context.Context, req *nodev1.SendObservationRequestRequest) (*nodev1.SendObservationRequestResponse, error) {
s.obsvReqSendC <- req.ObservationRequest if err := common.PostObservationRequest(s.obsvReqSendC, req.ObservationRequest); err != nil {
return nil, err
}
s.logger.Info("sent observation request", zap.Any("request", req.ObservationRequest)) s.logger.Info("sent observation request", zap.Any("request", req.ObservationRequest))
return &nodev1.SendObservationRequestResponse{}, nil return &nodev1.SendObservationRequestResponse{}, nil
} }

View File

@ -793,10 +793,10 @@ func runNode(cmd *cobra.Command, args []string) {
signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50) signedInC := make(chan *gossipv1.SignedVAAWithQuorum, 50)
// Inbound observation requests from the p2p service (for all chains) // Inbound observation requests from the p2p service (for all chains)
obsvReqC := make(chan *gossipv1.ObservationRequest, 50) obsvReqC := make(chan *gossipv1.ObservationRequest, common.ObsvReqChannelSize)
// Outbound observation requests // Outbound observation requests
obsvReqSendC := make(chan *gossipv1.ObservationRequest) obsvReqSendC := make(chan *gossipv1.ObservationRequest, common.ObsvReqChannelSize)
// Injected VAAs (manually generated rather than created via observation) // Injected VAAs (manually generated rather than created via observation)
injectC := make(chan *vaa.VAA) injectC := make(chan *vaa.VAA)

View File

@ -0,0 +1,19 @@
package common
import (
"fmt"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
)
const ObsvReqChannelSize = 50
const ObsvReqChannelFullError = "channel is full"
func PostObservationRequest(obsvReqSendC chan *gossipv1.ObservationRequest, req *gossipv1.ObservationRequest) error {
if len(obsvReqSendC) >= cap(obsvReqSendC) {
return fmt.Errorf(ObsvReqChannelFullError)
}
obsvReqSendC <- req
return nil
}

View File

@ -0,0 +1,43 @@
package common
import (
"testing"
"time"
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1"
"github.com/certusone/wormhole/node/pkg/vaa"
"github.com/stretchr/testify/assert"
)
func TestObsvReqSendLimitEnforced(t *testing.T) {
obsvReqSendC := make(chan *gossipv1.ObservationRequest, ObsvReqChannelSize)
// If the channel overflows, the write hangs, so use a go routine with a timeout.
done := false
go func() {
// Filling the queue up should work.
for count := 1; count <= ObsvReqChannelSize; count++ {
req := &gossipv1.ObservationRequest{
ChainId: uint32(vaa.ChainIDSolana),
}
err := PostObservationRequest(obsvReqSendC, req)
assert.Nil(t, err)
}
// But one more write should fail.
req := &gossipv1.ObservationRequest{
ChainId: uint32(vaa.ChainIDSolana),
}
err := PostObservationRequest(obsvReqSendC, req)
assert.NotNil(t, err)
assert.Equal(t, ObsvReqChannelFullError, err.Error())
done = true
}()
time.Sleep(time.Second)
// Make sure we didn't hang.
assert.Equal(t, true, done)
}