diff --git a/node/pkg/watchers/near/watcher_test.go b/node/pkg/watchers/near/watcher_test.go index df8e672ac..b6b60b291 100644 --- a/node/pkg/watchers/near/watcher_test.go +++ b/node/pkg/watchers/near/watcher_test.go @@ -4,6 +4,7 @@ import ( "context" "crypto/sha256" "encoding/hex" + "fmt" "net/http/httptest" "os" "testing" @@ -26,6 +27,11 @@ const ( type ( testCase struct { + // meta + name string + t *testing.T + timeout time.Duration + // configuration wormholeContract string upstreamHost string // e.g. "https://rpc.mainnet.near.org" @@ -35,8 +41,7 @@ type ( expectedMsgObserved []*common.MessagePublication expectedMsgReObserved []*common.MessagePublication - // storage - t *testing.T + // doneC is written to by the goroutine of testCase.run to communicate its result to the parent testCase.setupAndRun doneC chan error } ) @@ -89,11 +94,13 @@ func (testCase *testCase) run(ctx context.Context) error { // Run the mock server mockServer := mockserver.NewForwardingCachingServer(logger, testCase.upstreamHost, testCase.cacheDir, testCase.latestFinalBlocks) mockHttpServer := httptest.NewServer(mockServer) + defer mockHttpServer.Close() // Setup a watcher msgC := make(chan *common.MessagePublication) - obsvReqC := make(chan *gossipv1.ObservationRequest) - w := NewWatcher(mockHttpServer.URL, testCase.wormholeContract, msgC, obsvReqC, true) + obsvReqC := make(chan *gossipv1.ObservationRequest, 50) + mainnet := true // we set mainnet to true because the testdata was collected on mainnet. + w := NewWatcher(mockHttpServer.URL, testCase.wormholeContract, msgC, obsvReqC, mainnet) // Run the watcher if err := supervisor.Run(ctx, "nearwatch", w.Run); err != nil { @@ -109,11 +116,16 @@ func (testCase *testCase) run(ctx context.Context) error { } for i := 0; i < len(expectedMsgObserved); i++ { - msg := <-msgC - assert.Contains(testCase.t, expectedMsgObserved, msg.MessageIDString(), "unexpected message: %v", msg) - assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].seen, false, "already observed message: %v", msg) - assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].MessagePublication, msg) - expectedMsgObserved[msg.MessageIDString()].seen = true + select { + case msg := <-msgC: + assert.Contains(testCase.t, expectedMsgObserved, msg.MessageIDString(), "unexpected message: %v", msg) + assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].seen, false, "already observed message: %v", msg) + assert.Equal(testCase.t, expectedMsgObserved[msg.MessageIDString()].MessagePublication, msg) + expectedMsgObserved[msg.MessageIDString()].seen = true + case <-ctx.Done(): + testCase.doneC <- fmt.Errorf("parent ctx cancel/timeout while waiting for normal messages") + return ctx.Err() + } } for publication, b := range expectedMsgObserved { @@ -122,9 +134,15 @@ func (testCase *testCase) run(ctx context.Context) error { } } - // feed in the observation requests + // feed in the re-observation requests. for k := range testCase.obsvReq { - obsvReqC <- &testCase.obsvReq[k] + select { + case obsvReqC <- &testCase.obsvReq[k]: + default: + err := fmt.Errorf("obsvReqC is full") + testCase.doneC <- err + return err + } } // assert that messages were re-observed correctly... @@ -134,11 +152,16 @@ func (testCase *testCase) run(ctx context.Context) error { } for i := 0; i < len(expectedMsgReObserved); i++ { - msg := <-msgC - assert.Contains(testCase.t, expectedMsgReObserved, msg.MessageIDString(), "unexpected message: %v", msg) - assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].seen, false, "already reobserved message: %v", msg) - assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].MessagePublication, msg) - expectedMsgReObserved[msg.MessageIDString()].seen = true + select { + case msg := <-msgC: + assert.Contains(testCase.t, expectedMsgReObserved, msg.MessageIDString(), "unexpected message: %v", msg) + assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].seen, false, "already reobserved message: %v", msg) + assert.Equal(testCase.t, expectedMsgReObserved[msg.MessageIDString()].MessagePublication, msg) + expectedMsgReObserved[msg.MessageIDString()].seen = true + case <-ctx.Done(): + testCase.doneC <- fmt.Errorf("parent ctx cancel/timeout while waiting for re-observed messages") + return ctx.Err() + } } for publication, b := range expectedMsgReObserved { @@ -147,8 +170,6 @@ func (testCase *testCase) run(ctx context.Context) error { } } - println("reobserved messages ok") - // there should be no messages left now assert.Equal(testCase.t, len(msgC), 0) @@ -158,22 +179,23 @@ func (testCase *testCase) run(ctx context.Context) error { return nil } -func (testCase *testCase) setupAndRun(logger *zap.Logger, timeout time.Duration) { +func (testCase *testCase) setupAndRun(logger *zap.Logger) { // Setup context (with timeout) and logger - rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), timeout) + rootCtx, rootCtxCancel := context.WithTimeout(context.Background(), testCase.timeout) defer rootCtxCancel() testCase.doneC = make(chan error, 1) // run the test supervisor.New(rootCtx, logger, func(ctx context.Context) error { - if err := supervisor.Run(ctx, "test", testCase.run); err != nil { + if err := supervisor.Run(ctx, "near-"+testCase.name, testCase.run); err != nil { assert.NoError(testCase.t, err) return err } supervisor.Signal(ctx, supervisor.SignalHealthy) <-rootCtx.Done() + supervisor.Signal(ctx, supervisor.SignalDone) return nil }, supervisor.WithPropagatePanic) @@ -193,13 +215,14 @@ func (testCase *testCase) setupAndRun(logger *zap.Logger, timeout time.Duration) // TestWatcherSimple() tests the most simple case: "final" API only retruns one block which contains a Wormhole transaction. No re-observation requests. func TestWatcherSimple(t *testing.T) { - timeout := time.Second * 2 logger, _ := zap.NewDevelopment() pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000") txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8") tc := testCase{ + name: "TestWatcherSimple", + timeout: time.Second * 2, t: t, wormholeContract: WORMHOLE_CONTRACT, upstreamHost: "", @@ -222,19 +245,20 @@ func TestWatcherSimple(t *testing.T) { }, } - tc.setupAndRun(logger, timeout) + tc.setupAndRun(logger) } // TestWatcherReobservation() tests the simple re-observation case: The "final" endpoint returns // the same unrelated block and there is a re-observation request for past data. func TestWatcherReobservation(t *testing.T) { - timeout := time.Second * 5 logger, _ := zap.NewDevelopment() pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000") txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8") tc := testCase{ + name: "TestWatcherReobservation", + timeout: time.Second * 5, t: t, wormholeContract: WORMHOLE_CONTRACT, upstreamHost: "", @@ -263,19 +287,20 @@ func TestWatcherReobservation(t *testing.T) { }, } - tc.setupAndRun(logger, timeout) + tc.setupAndRun(logger) } // TestWatcherDelayedFinal() tests the case where a block cannot be finalized by a parent having it as // last_final_block and instead needs to be finalized by having it observed as finalized during polling func TestWatcherSimple2(t *testing.T) { - timeout := time.Second * 2 logger, _ := zap.NewDevelopment() pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000") txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8") tc := testCase{ + name: "TestWatcherSimple2", + timeout: time.Second * 2, t: t, wormholeContract: WORMHOLE_CONTRACT, upstreamHost: "", @@ -305,19 +330,20 @@ func TestWatcherSimple2(t *testing.T) { }, } - tc.setupAndRun(logger, timeout) + tc.setupAndRun(logger) } // TestWatcherDelayedFinal() tests the case where a block cannot be finalized by a parent having it as // last_final_block and instead needs to be finalized by having it observed as finalized during polling func TestWatcherDelayedFinal(t *testing.T) { - timeout := time.Second * 2 logger, _ := zap.NewDevelopment() pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000") txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8") tc := testCase{ + name: "TestWatcherDelayedFinal", + timeout: time.Second * 2, t: t, wormholeContract: WORMHOLE_CONTRACT, upstreamHost: "", @@ -347,20 +373,21 @@ func TestWatcherDelayedFinal(t *testing.T) { }, } - tc.setupAndRun(logger, timeout) + tc.setupAndRun(logger) } // TestWatcherDelayedFinalAndGaps() tests the case where a block cannot be finalized by a parent having it as // last_final_block and instead needs to be finalized by having it observed as finalized during polling // additionally, there is a large gap between polls func TestWatcherDelayedFinalAndGaps(t *testing.T) { - timeout := time.Second * 2 logger, _ := zap.NewDevelopment() pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000") txHashBytes, _ := hex.DecodeString("88029cf0e7432cec04c266a3e72903ee6650b4624c7f9c8e22b04d78e18e87f8") tc := testCase{ + name: "TestWatcherDelayedFinalAndGaps", + timeout: time.Second * 2, t: t, wormholeContract: WORMHOLE_CONTRACT, upstreamHost: "", @@ -385,7 +412,7 @@ func TestWatcherDelayedFinalAndGaps(t *testing.T) { }, } - tc.setupAndRun(logger, timeout) + tc.setupAndRun(logger) } // TestWatcherSynthetic(): Case where there are three wormhole messages. Test data is generated (not real) @@ -400,12 +427,13 @@ func TestWatcherDelayedFinalAndGaps(t *testing.T) { "6eCgeVSC4Hwm8tAVy4qNQpnLs4S9EpzRjGtAipwZ632A", // 76538236 block 7: tx3 receipt */ func TestWatcherSynthetic(t *testing.T) { - timeout := time.Second * 2 logger, _ := zap.NewDevelopment() pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000") tc := testCase{ + name: "TestWatcherSynthetic", + timeout: time.Second * 2, t: t, wormholeContract: WORMHOLE_CONTRACT, upstreamHost: "", @@ -474,7 +502,7 @@ func TestWatcherSynthetic(t *testing.T) { }, } - tc.setupAndRun(logger, timeout) + tc.setupAndRun(logger) } // TestWatcherUnfinalized(): Same as synthetic, but one of the blocks is not finalized and that message has to be excluded. @@ -492,12 +520,13 @@ func TestWatcherSynthetic(t *testing.T) { "6eCgeVSC4Hwm8tAVy4qNQpnLs4S9EpzRjGtAipwZ632A", // 76538236 block 7: tx3 receipt */ func TestWatcherUnfinalized(t *testing.T) { - timeout := time.Second * 2 logger, _ := zap.NewDevelopment() pl, _ := hex.DecodeString("0100000000000000000000000000000000000000000000000000000000000f42400000000000000000000000000000000000000000000000000000000000000000000f0108bc32f7de18a5f6e1e7d6ee7aff9f5fc858d0d87ac0da94dd8d2a5d267d6b00160000000000000000000000000000000000000000000000000000000000000000") tc := testCase{ + name: "TestWatcherUnfinalized", + timeout: time.Second * 2, t: t, wormholeContract: WORMHOLE_CONTRACT, upstreamHost: "", @@ -555,7 +584,7 @@ func TestWatcherUnfinalized(t *testing.T) { }, } - tc.setupAndRun(logger, timeout) + tc.setupAndRun(logger) } func TestSuccessValueToInt(t *testing.T) {