diff --git a/container/blockchain.go b/container/blockchain.go index 7814287d..eb280c18 100644 --- a/container/blockchain.go +++ b/container/blockchain.go @@ -22,6 +22,7 @@ import ( "log" "os" "path/filepath" + "time" "github.com/docker/docker/client" "github.com/ethereum/go-ethereum/common" @@ -31,6 +32,7 @@ import ( ) type Blockchain interface { + EnsureConsensusWorking(geths []Ethereum, t time.Duration) error Start() error Stop(bool) error Validators() []Ethereum @@ -61,6 +63,27 @@ type blockchain struct { validators []Ethereum } +func (bc *blockchain) EnsureConsensusWorking(geths []Ethereum, t time.Duration) error { + errCh := make(chan error, len(geths)) + quitCh := make(chan struct{}, len(geths)) + for _, geth := range geths { + go geth.ConsensusMonitor(errCh, quitCh) + } + + timeout := time.NewTimer(t) + defer timeout.Stop() + + var err error + select { + case err = <-errCh: + case <-timeout.C: + for i := 0; i < len(geths); i++ { + quitCh <- struct{}{} + } + } + return err +} + func (bc *blockchain) Start() error { for _, v := range bc.validators { if err := v.Start(); err != nil { diff --git a/container/ethereum.go b/container/ethereum.go index ef62fe53..172f4375 100644 --- a/container/ethereum.go +++ b/container/ethereum.go @@ -36,6 +36,7 @@ import ( "github.com/docker/docker/client" "github.com/docker/go-connections/nat" "github.com/ethereum/go-ethereum/cmd/utils" + ethtypes "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/p2p/discover" @@ -48,6 +49,10 @@ const ( healthCheckRetryDelay = 2 * time.Second ) +var ( + ErrConsensusTimeout = errors.New("consensus timeout") +) + type Ethereum interface { Init(string) error Start() error @@ -59,6 +64,7 @@ type Ethereum interface { Host() string NewClient() *ethclient.Client NewIstanbulClient() *istclient.Client + ConsensusMonitor(err chan<- error, quit chan struct{}) } func NewEthereum(c *client.Client, options ...Option) *ethereum { @@ -357,6 +363,47 @@ func (eth *ethereum) NodeAddress() string { return "" } +func (eth *ethereum) ConsensusMonitor(errCh chan<- error, quit chan struct{}) { + cli := eth.NewClient() + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + subCh := make(chan *ethtypes.Header) + + sub, err := cli.SubscribeNewHead(ctx, subCh) + if err != nil { + log.Fatal(fmt.Sprintf("subscribe error:%v", err)) + errCh <- err + return + } + defer sub.Unsubscribe() + + for { + latestUpdate := time.Now() + select { + case err := <-sub.Err(): + log.Printf("Connection lost: %v", err) + errCh <- err + return + case head := <-subCh: + // Ensure that mining is stable. + if head.Number.Uint64() < 3 { + continue + } + + // Block is generated by 2 seconds. We tolerate 1 second delay in consensus. + if time.Now().Sub(latestUpdate).Seconds() > 3.0 { + errCh <- ErrConsensusTimeout + return + } + latestUpdate = time.Now() + case <-quit: + return + } + } +} + // ---------------------------------------------------------------------------- func (eth *ethereum) showLog(context context.Context) { diff --git a/tests/integration_test.go b/tests/integration_test.go index 03f0bf9c..1f2910d7 100644 --- a/tests/integration_test.go +++ b/tests/integration_test.go @@ -20,6 +20,7 @@ import ( "context" "math/big" "testing" + "time" . "github.com/onsi/ginkgo" . "github.com/onsi/gomega" @@ -43,7 +44,7 @@ var _ = Describe("4 validators Istanbul", func() { container.DataDir("/data"), container.WebSocket(), container.WebSocketAddress("0.0.0.0"), - container.WebSocketAPI("admin,eth,net,web3,personal,miner"), + container.WebSocketAPI("admin,eth,net,web3,personal,miner,istanbul"), container.WebSocketOrigin("*"), container.NAT("any"), container.NoDiscover(), @@ -69,6 +70,10 @@ var _ = Describe("4 validators Istanbul", func() { Expect(err).To(BeNil()) Expect(block).NotTo(BeNil()) } + + By("Ensure that consensus is working in 30 seconds", func() { + Expect(blockchain.EnsureConsensusWorking(blockchain.Validators(), 30*time.Second)).Should(BeNil()) + }) }) })