Merge pull request #25 from getamis/feature/add-simple-consensus-monitor
container: add monitor to detect consensus status
This commit is contained in:
commit
80c6aa0738
|
@ -22,6 +22,7 @@ import (
|
||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"path/filepath"
|
"path/filepath"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
"github.com/ethereum/go-ethereum/common"
|
"github.com/ethereum/go-ethereum/common"
|
||||||
|
@ -31,6 +32,7 @@ import (
|
||||||
)
|
)
|
||||||
|
|
||||||
type Blockchain interface {
|
type Blockchain interface {
|
||||||
|
EnsureConsensusWorking(geths []Ethereum, t time.Duration) error
|
||||||
Start() error
|
Start() error
|
||||||
Stop(bool) error
|
Stop(bool) error
|
||||||
Validators() []Ethereum
|
Validators() []Ethereum
|
||||||
|
@ -61,6 +63,27 @@ type blockchain struct {
|
||||||
validators []Ethereum
|
validators []Ethereum
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bc *blockchain) EnsureConsensusWorking(geths []Ethereum, t time.Duration) error {
|
||||||
|
errCh := make(chan error, len(geths))
|
||||||
|
quitCh := make(chan struct{}, len(geths))
|
||||||
|
for _, geth := range geths {
|
||||||
|
go geth.ConsensusMonitor(errCh, quitCh)
|
||||||
|
}
|
||||||
|
|
||||||
|
timeout := time.NewTimer(t)
|
||||||
|
defer timeout.Stop()
|
||||||
|
|
||||||
|
var err error
|
||||||
|
select {
|
||||||
|
case err = <-errCh:
|
||||||
|
case <-timeout.C:
|
||||||
|
for i := 0; i < len(geths); i++ {
|
||||||
|
quitCh <- struct{}{}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
func (bc *blockchain) Start() error {
|
func (bc *blockchain) Start() error {
|
||||||
for _, v := range bc.validators {
|
for _, v := range bc.validators {
|
||||||
if err := v.Start(); err != nil {
|
if err := v.Start(); err != nil {
|
||||||
|
|
|
@ -36,6 +36,7 @@ import (
|
||||||
"github.com/docker/docker/client"
|
"github.com/docker/docker/client"
|
||||||
"github.com/docker/go-connections/nat"
|
"github.com/docker/go-connections/nat"
|
||||||
"github.com/ethereum/go-ethereum/cmd/utils"
|
"github.com/ethereum/go-ethereum/cmd/utils"
|
||||||
|
ethtypes "github.com/ethereum/go-ethereum/core/types"
|
||||||
"github.com/ethereum/go-ethereum/ethclient"
|
"github.com/ethereum/go-ethereum/ethclient"
|
||||||
"github.com/ethereum/go-ethereum/p2p/discover"
|
"github.com/ethereum/go-ethereum/p2p/discover"
|
||||||
|
|
||||||
|
@ -48,6 +49,10 @@ const (
|
||||||
healthCheckRetryDelay = 2 * time.Second
|
healthCheckRetryDelay = 2 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
|
var (
|
||||||
|
ErrConsensusTimeout = errors.New("consensus timeout")
|
||||||
|
)
|
||||||
|
|
||||||
type Ethereum interface {
|
type Ethereum interface {
|
||||||
Init(string) error
|
Init(string) error
|
||||||
Start() error
|
Start() error
|
||||||
|
@ -59,6 +64,7 @@ type Ethereum interface {
|
||||||
Host() string
|
Host() string
|
||||||
NewClient() *ethclient.Client
|
NewClient() *ethclient.Client
|
||||||
NewIstanbulClient() *istclient.Client
|
NewIstanbulClient() *istclient.Client
|
||||||
|
ConsensusMonitor(err chan<- error, quit chan struct{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewEthereum(c *client.Client, options ...Option) *ethereum {
|
func NewEthereum(c *client.Client, options ...Option) *ethereum {
|
||||||
|
@ -357,6 +363,47 @@ func (eth *ethereum) NodeAddress() string {
|
||||||
return ""
|
return ""
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (eth *ethereum) ConsensusMonitor(errCh chan<- error, quit chan struct{}) {
|
||||||
|
cli := eth.NewClient()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
subCh := make(chan *ethtypes.Header)
|
||||||
|
|
||||||
|
sub, err := cli.SubscribeNewHead(ctx, subCh)
|
||||||
|
if err != nil {
|
||||||
|
log.Fatal(fmt.Sprintf("subscribe error:%v", err))
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
}
|
||||||
|
defer sub.Unsubscribe()
|
||||||
|
|
||||||
|
for {
|
||||||
|
latestUpdate := time.Now()
|
||||||
|
select {
|
||||||
|
case err := <-sub.Err():
|
||||||
|
log.Printf("Connection lost: %v", err)
|
||||||
|
errCh <- err
|
||||||
|
return
|
||||||
|
case head := <-subCh:
|
||||||
|
// Ensure that mining is stable.
|
||||||
|
if head.Number.Uint64() < 3 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Block is generated by 2 seconds. We tolerate 1 second delay in consensus.
|
||||||
|
if time.Now().Sub(latestUpdate).Seconds() > 3.0 {
|
||||||
|
errCh <- ErrConsensusTimeout
|
||||||
|
return
|
||||||
|
}
|
||||||
|
latestUpdate = time.Now()
|
||||||
|
case <-quit:
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// ----------------------------------------------------------------------------
|
// ----------------------------------------------------------------------------
|
||||||
|
|
||||||
func (eth *ethereum) showLog(context context.Context) {
|
func (eth *ethereum) showLog(context context.Context) {
|
||||||
|
|
|
@ -20,6 +20,7 @@ import (
|
||||||
"context"
|
"context"
|
||||||
"math/big"
|
"math/big"
|
||||||
"testing"
|
"testing"
|
||||||
|
"time"
|
||||||
|
|
||||||
. "github.com/onsi/ginkgo"
|
. "github.com/onsi/ginkgo"
|
||||||
. "github.com/onsi/gomega"
|
. "github.com/onsi/gomega"
|
||||||
|
@ -43,7 +44,7 @@ var _ = Describe("4 validators Istanbul", func() {
|
||||||
container.DataDir("/data"),
|
container.DataDir("/data"),
|
||||||
container.WebSocket(),
|
container.WebSocket(),
|
||||||
container.WebSocketAddress("0.0.0.0"),
|
container.WebSocketAddress("0.0.0.0"),
|
||||||
container.WebSocketAPI("admin,eth,net,web3,personal,miner"),
|
container.WebSocketAPI("admin,eth,net,web3,personal,miner,istanbul"),
|
||||||
container.WebSocketOrigin("*"),
|
container.WebSocketOrigin("*"),
|
||||||
container.NAT("any"),
|
container.NAT("any"),
|
||||||
container.NoDiscover(),
|
container.NoDiscover(),
|
||||||
|
@ -69,6 +70,10 @@ var _ = Describe("4 validators Istanbul", func() {
|
||||||
Expect(err).To(BeNil())
|
Expect(err).To(BeNil())
|
||||||
Expect(block).NotTo(BeNil())
|
Expect(block).NotTo(BeNil())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
By("Ensure that consensus is working in 30 seconds", func() {
|
||||||
|
Expect(blockchain.EnsureConsensusWorking(blockchain.Validators(), 30*time.Second)).Should(BeNil())
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
|
||||||
|
|
Loading…
Reference in New Issue