node/near: add timeouts to tests

This commit is contained in:
tbjump 2023-05-23 22:32:45 +00:00 committed by tbjump
parent 1937691afe
commit f52b123586
1 changed files with 64 additions and 35 deletions

View File

@ -4,6 +4,7 @@ import (
"context"
"crypto/sha256"
"encoding/hex"
"fmt"
"net/http/httptest"
"os"
"testing"
@ -26,6 +27,11 @@ const (
type (
testCase struct {
// meta
name string
t *testing.T
timeout time.Duration
// configuration
wormholeContract string
upstreamHost string // e.g. "https://rpc.mainnet.near.org"
@ -35,8 +41,7 @@ type (
expectedMsgObserved []*common.MessagePublication
expectedMsgReObserved []*common.MessagePublication
// storage
t *testing.T
// doneC is written to by the goroutine of testCase.run to communicate its result to the parent testCase.setupAndRun
doneC chan error
}
)
@ -89,11 +94,13 @@ func (testCase *testCase) run(ctx context.Context) error {
// Run the mock server
mockServer := mockserver.NewForwardingCachingServer(logger, testCase.upstreamHost, testCase.cacheDir, testCase.latestFinalBlocks)
mockHttpServer := httptest.NewServer(mockServer)
defer mockHttpServer.Close()
// Setup a watcher
msgC := make(chan *common.MessagePublication)
obsvReqC := make(chan *gossipv1.ObservationRequest)
w := NewWatcher(mockHttpServer.URL, testCase.wormholeContract, msgC, obsvReqC, true)
obsvReqC := make(chan *gossipv1.ObservationRequest, 50)
mainnet := true // we set mainnet to true because the testdata was collected on mainnet.
w := NewWatcher(mockHttpServer.URL, testCase.wormholeContract, msgC, obsvReqC, mainnet)
// Run the watcher
if err := supervisor.Run(ctx, "nearwatch", w.Run); err != nil {
@ -109,11 +116,16 @@ func (testCase *testCase) run(ctx context.Context) error {
}
for i := 0; i < len(expectedMsgObserved); i++ {
msg := <-msgC
assert.Contains(testCase.t, expectedMsgObserved, msg.MessageIDString(), "unexpected message: %v", msg)
assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].seen, false, "already observed message: %v", msg)
assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].MessagePublication, msg)
expectedMsgObserved[msg.MessageIDString()].seen = true
select {
case msg := <-msgC:
assert.Contains(testCase.t, expectedMsgObserved, msg.MessageIDString(), "unexpected message: %v", msg)
assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].seen, false, "already observed message: %v", msg)
assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].MessagePublication, msg)
expectedMsgObserved[msg.MessageIDString()].seen = true
case <-ctx.Done():
testCase.doneC <- fmt.Errorf("parent ctx cancel/timeout while waiting for normal messages")
return ctx.Err()
}
}
for publication, b := range expectedMsgObserved {
@ -122,9 +134,15 @@ func (testCase *testCase) run(ctx context.Context) error {
}
}
// feed in the observation requests
// feed in the re-observation requests.
for k := range testCase.obsvReq {
obsvReqC <- &testCase.obsvReq[k]
select {
case obsvReqC <- &testCase.obsvReq[k]:
default:
err := fmt.Errorf("obsvReqC is full")
testCase.doneC <- err
return err
}
}
// assert that messages were re-observed correctly...
@ -134,11 +152,16 @@ func (testCase *testCase) run(ctx context.Context) error {
}
for i := 0; i < len(expectedMsgReObserved); i++ {
msg := <-msgC
assert.Contains(testCase.t, expectedMsgReObserved, msg.MessageIDString(), "unexpected message: %v", msg)
assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].seen, false, "already reobserved message: %v", msg)
assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].MessagePublication, msg)
expectedMsgReObserved[msg.MessageIDString()].seen = true
select {
case msg := <-msgC:
assert.Contains(testCase.t, expectedMsgReObserved, msg.MessageIDString(), "unexpected message: %v", msg)
assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].seen, false, "already reobserved message: %v", msg)
assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].MessagePublication, msg)
expectedMsgReObserved[msg.MessageIDString()].seen = true
case <-ctx.Done():
testCase.doneC <- fmt.Errorf("parent ctx cancel/timeout while waiting for re-observed messages")
return ctx.Err()
}
}
for publication, b := range expectedMsgReObserved {
@ -147,8 +170,6 @@ func (testCase *testCase) run(ctx context.Context) error {
}
}
println("reobserved messages ok")
// there should be no messages left now
assert.Equal(testCase.t, len(msgC), 0)
@ -158,22 +179,23 @@ func (testCase *testCase) run(ctx context.Context) error {
return nil
}
func (testCase *testCase) setupAndRun(logger *zap.Logger, timeout time.Duration) {
func (testCase *testCase) setupAndRun(logger *zap.Logger) {
// Setup context (with timeout) and logger
rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), timeout)
rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), testCase.timeout)
defer rootCtxCancel()
testCase.doneC = make(chan error, 1)
// run the test
supervisor.New(rootCtx, logger, func(ctx context.Context) error {
if err := supervisor.Run(ctx, "test", testCase.run); err != nil {
if err := supervisor.Run(ctx, "near-"+testCase.name, testCase.run); err != nil {
assert.NoError(testCase.t, err)
return err
}
supervisor.Signal(ctx, supervisor.SignalHealthy)
<-rootCtx.Done()
supervisor.Signal(ctx, supervisor.SignalDone)
return nil
}, supervisor.WithPropagatePanic)
@ -193,13 +215,14 @@ func (testCase *testCase) setupAndRun(logger *zap.Logger, timeout time.Duration)
// TestWatcherSimple() tests the most simple case: "final" API only retruns one block which contains a Wormhole transaction. No re-observation requests.
func TestWatcherSimple(t *testing.T) {
timeout := time.Second * 2
logger, _ := zap.NewDevelopment()
pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000")
txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8")
tc := testCase{
name: "TestWatcherSimple",
timeout: time.Second * 2,
t: t,
wormholeContract: WORMHOLE_CONTRACT,
upstreamHost: "",
@ -222,19 +245,20 @@ func TestWatcherSimple(t *testing.T) {
},
}
tc.setupAndRun(logger, timeout)
tc.setupAndRun(logger)
}
// TestWatcherReobservation() tests the simple re-observation case: The "final" endpoint returns
// the same unrelated block and there is a re-observation request for past data.
func TestWatcherReobservation(t *testing.T) {
timeout := time.Second * 5
logger, _ := zap.NewDevelopment()
pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000")
txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8")
tc := testCase{
name: "TestWatcherReobservation",
timeout: time.Second * 5,
t: t,
wormholeContract: WORMHOLE_CONTRACT,
upstreamHost: "",
@ -263,19 +287,20 @@ func TestWatcherReobservation(t *testing.T) {
},
}
tc.setupAndRun(logger, timeout)
tc.setupAndRun(logger)
}
// TestWatcherDelayedFinal() tests the case where a block cannot be finalized by a parent having it as
// last_final_block and instead needs to be finalized by having it observed as finalized during polling
func TestWatcherSimple2(t *testing.T) {
timeout := time.Second * 2
logger, _ := zap.NewDevelopment()
pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000")
txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8")
tc := testCase{
name: "TestWatcherSimple2",
timeout: time.Second * 2,
t: t,
wormholeContract: WORMHOLE_CONTRACT,
upstreamHost: "",
@ -305,19 +330,20 @@ func TestWatcherSimple2(t *testing.T) {
},
}
tc.setupAndRun(logger, timeout)
tc.setupAndRun(logger)
}
// TestWatcherDelayedFinal() tests the case where a block cannot be finalized by a parent having it as
// last_final_block and instead needs to be finalized by having it observed as finalized during polling
func TestWatcherDelayedFinal(t *testing.T) {
timeout := time.Second * 2
logger, _ := zap.NewDevelopment()
pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000")
txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8")
tc := testCase{
name: "TestWatcherDelayedFinal",
timeout: time.Second * 2,
t: t,
wormholeContract: WORMHOLE_CONTRACT,
upstreamHost: "",
@ -347,20 +373,21 @@ func TestWatcherDelayedFinal(t *testing.T) {
},
}
tc.setupAndRun(logger, timeout)
tc.setupAndRun(logger)
}
// TestWatcherDelayedFinalAndGaps() tests the case where a block cannot be finalized by a parent having it as
// last_final_block and instead needs to be finalized by having it observed as finalized during polling
// additionally, there is a large gap between polls
func TestWatcherDelayedFinalAndGaps(t *testing.T) {
timeout := time.Second * 2
logger, _ := zap.NewDevelopment()
pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000")
txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8")
tc := testCase{
name: "TestWatcherDelayedFinalAndGaps",
timeout: time.Second * 2,
t: t,
wormholeContract: WORMHOLE_CONTRACT,
upstreamHost: "",
@ -385,7 +412,7 @@ func TestWatcherDelayedFinalAndGaps(t *testing.T) {
},
}
tc.setupAndRun(logger, timeout)
tc.setupAndRun(logger)
}
// TestWatcherSynthetic(): Case where there are three wormhole messages. Test data is generated (not real)
@ -400,12 +427,13 @@ func TestWatcherDelayedFinalAndGaps(t *testing.T) {
"6eCgeVSC4Hwm8tAVy4qNQpnLs4S9EpzRjGtAipwZ632A", // 76538236 block 7: tx3 receipt
*/
func TestWatcherSynthetic(t *testing.T) {
timeout := time.Second * 2
logger, _ := zap.NewDevelopment()
pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000")
tc := testCase{
name: "TestWatcherSynthetic",
timeout: time.Second * 2,
t: t,
wormholeContract: WORMHOLE_CONTRACT,
upstreamHost: "",
@ -474,7 +502,7 @@ func TestWatcherSynthetic(t *testing.T) {
},
}
tc.setupAndRun(logger, timeout)
tc.setupAndRun(logger)
}
// TestWatcherUnfinalized(): Same as synthetic, but one of the blocks is not finalized and that message has to be excluded.
@ -492,12 +520,13 @@ func TestWatcherSynthetic(t *testing.T) {
"6eCgeVSC4Hwm8tAVy4qNQpnLs4S9EpzRjGtAipwZ632A", // 76538236 block 7: tx3 receipt
*/
func TestWatcherUnfinalized(t *testing.T) {
timeout := time.Second * 2
logger, _ := zap.NewDevelopment()
pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000")
tc := testCase{
name: "TestWatcherUnfinalized",
timeout: time.Second * 2,
t: t,
wormholeContract: WORMHOLE_CONTRACT,
upstreamHost: "",
@ -555,7 +584,7 @@ func TestWatcherUnfinalized(t *testing.T) {
},
}
tc.setupAndRun(logger, timeout)
tc.setupAndRun(logger)
}
func TestSuccessValueToInt(t *testing.T) {