Merge pull request #80 from getamis/refactor-docker-network

container: refactor docker network
This commit is contained in:
Miya Chen 2017-09-14 17:24:35 +08:00 committed by GitHub
commit a1dd7527d4
4 changed files with 126 additions and 129 deletions

View File

@ -56,7 +56,12 @@ type Blockchain interface {
} }
func NewBlockchain(network *DockerNetwork, numOfValidators int, options ...Option) (bc *blockchain) { func NewBlockchain(network *DockerNetwork, numOfValidators int, options ...Option) (bc *blockchain) {
bc = &blockchain{opts: options} if network == nil {
log.Error("Docker network is required")
return nil
}
bc = &blockchain{dockerNetwork: network, opts: options}
var err error var err error
bc.dockerClient, err = client.NewEnvClient() bc.dockerClient, err = client.NewEnvClient()
@ -65,18 +70,7 @@ func NewBlockchain(network *DockerNetwork, numOfValidators int, options ...Optio
return nil return nil
} }
if network == nil { bc.opts = append(bc.opts, DockerNetworkName(bc.dockerNetwork.Name()))
bc.defaultNetwork, err = NewDockerNetwork()
if err != nil {
log.Error("Failed to create Docker network", "err", err)
return nil
}
network = bc.defaultNetwork
}
bc.dockerNetworkName = network.Name()
bc.getFreeIPAddrs = network.GetFreeIPAddrs
bc.opts = append(bc.opts, DockerNetworkName(bc.dockerNetworkName))
//Create accounts //Create accounts
bc.generateAccounts(numOfValidators) bc.generateAccounts(numOfValidators)
@ -107,7 +101,13 @@ func NewDefaultBlockchain(network *DockerNetwork, numOfValidators int) (bc *bloc
} }
func NewDefaultBlockchainWithFaulty(network *DockerNetwork, numOfNormal int, numOfFaulty int) (bc *blockchain) { func NewDefaultBlockchainWithFaulty(network *DockerNetwork, numOfNormal int, numOfFaulty int) (bc *blockchain) {
if network == nil {
log.Error("Docker network is required")
return nil
}
commonOpts := [...]Option{ commonOpts := [...]Option{
DockerNetworkName(network.Name()),
DataDir("/data"), DataDir("/data"),
WebSocket(), WebSocket(),
WebSocketAddress("0.0.0.0"), WebSocketAddress("0.0.0.0"),
@ -129,7 +129,7 @@ func NewDefaultBlockchainWithFaulty(network *DockerNetwork, numOfNormal int, num
faultyOpts = append(faultyOpts, ImageRepository("quay.io/amis/geth_faulty"), ImageTag("latest"), FaultyMode(1)) faultyOpts = append(faultyOpts, ImageRepository("quay.io/amis/geth_faulty"), ImageTag("latest"), FaultyMode(1))
// New env client // New env client
bc = &blockchain{} bc = &blockchain{dockerNetwork: network}
var err error var err error
bc.dockerClient, err = client.NewEnvClient() bc.dockerClient, err = client.NewEnvClient()
if err != nil { if err != nil {
@ -137,24 +137,9 @@ func NewDefaultBlockchainWithFaulty(network *DockerNetwork, numOfNormal int, num
return nil return nil
} }
if network == nil {
bc.defaultNetwork, err = NewDockerNetwork()
if err != nil {
log.Error("Failed to create Docker network", "err", err)
return nil
}
network = bc.defaultNetwork
}
bc.dockerNetworkName = network.Name()
bc.getFreeIPAddrs = network.GetFreeIPAddrs
normalOpts = append(normalOpts, DockerNetworkName(bc.dockerNetworkName))
faultyOpts = append(faultyOpts, DockerNetworkName(bc.dockerNetworkName))
totalNodes := numOfNormal + numOfFaulty totalNodes := numOfNormal + numOfFaulty
ips, err := bc.getFreeIPAddrs(totalNodes) ips, err := bc.dockerNetwork.GetFreeIPAddrs(totalNodes)
if err != nil { if err != nil {
log.Error("Failed to get free ip addresses", "err", err) log.Error("Failed to get free ip addresses", "err", err)
return nil return nil
@ -175,7 +160,12 @@ func NewDefaultBlockchainWithFaulty(network *DockerNetwork, numOfNormal int, num
} }
func NewQuorumBlockchain(network *DockerNetwork, ctn ConstellationNetwork, options ...Option) (bc *blockchain) { func NewQuorumBlockchain(network *DockerNetwork, ctn ConstellationNetwork, options ...Option) (bc *blockchain) {
bc = &blockchain{opts: options, isQuorum: true, constellationNetwork: ctn} if network == nil {
log.Error("Docker network is required")
return nil
}
bc = &blockchain{dockerNetwork: network, opts: options, isQuorum: true, constellationNetwork: ctn}
bc.opts = append(bc.opts, IsQuorum(true)) bc.opts = append(bc.opts, IsQuorum(true))
bc.opts = append(bc.opts, NoUSB()) bc.opts = append(bc.opts, NoUSB())
@ -186,18 +176,7 @@ func NewQuorumBlockchain(network *DockerNetwork, ctn ConstellationNetwork, optio
return nil return nil
} }
if network == nil { bc.opts = append(bc.opts, DockerNetworkName(bc.dockerNetwork.Name()))
bc.defaultNetwork, err = NewDockerNetwork()
if err != nil {
log.Error("Failed to create Docker network", "err", err)
return nil
}
network = bc.defaultNetwork
}
bc.dockerNetworkName = network.Name()
bc.getFreeIPAddrs = network.GetFreeIPAddrs
bc.opts = append(bc.opts, DockerNetworkName(bc.dockerNetworkName))
//Create accounts //Create accounts
bc.generateAccounts(ctn.NumOfConstellations()) bc.generateAccounts(ctn.NumOfConstellations())
@ -228,7 +207,13 @@ func NewDefaultQuorumBlockchain(network *DockerNetwork, ctn ConstellationNetwork
} }
func NewDefaultQuorumBlockchainWithFaulty(network *DockerNetwork, ctn ConstellationNetwork, numOfNormal int, numOfFaulty int) (bc *blockchain) { func NewDefaultQuorumBlockchainWithFaulty(network *DockerNetwork, ctn ConstellationNetwork, numOfNormal int, numOfFaulty int) (bc *blockchain) {
if network == nil {
log.Error("Docker network is required")
return nil
}
commonOpts := [...]Option{ commonOpts := [...]Option{
DockerNetworkName(network.Name()),
DataDir("/data"), DataDir("/data"),
WebSocket(), WebSocket(),
WebSocketAddress("0.0.0.0"), WebSocketAddress("0.0.0.0"),
@ -249,11 +234,10 @@ func NewDefaultQuorumBlockchainWithFaulty(network *DockerNetwork, ctn Constellat
normalOpts = append(normalOpts, ImageRepository("quay.io/amis/quorum"), ImageTag("feature_istanbul")) normalOpts = append(normalOpts, ImageRepository("quay.io/amis/quorum"), ImageTag("feature_istanbul"))
faultyOpts := make([]Option, len(commonOpts), len(commonOpts)+3) faultyOpts := make([]Option, len(commonOpts), len(commonOpts)+3)
copy(faultyOpts, commonOpts[:]) copy(faultyOpts, commonOpts[:])
// FIXME: Needs a faulty quorum
faultyOpts = append(faultyOpts, ImageRepository("quay.io/amis/quorum_faulty"), ImageTag("latest"), FaultyMode(1)) faultyOpts = append(faultyOpts, ImageRepository("quay.io/amis/quorum_faulty"), ImageTag("latest"), FaultyMode(1))
// New env client // New env client
bc = &blockchain{isQuorum: true, constellationNetwork: ctn} bc = &blockchain{dockerNetwork: network, isQuorum: true, constellationNetwork: ctn}
var err error var err error
bc.dockerClient, err = client.NewEnvClient() bc.dockerClient, err = client.NewEnvClient()
if err != nil { if err != nil {
@ -261,24 +245,9 @@ func NewDefaultQuorumBlockchainWithFaulty(network *DockerNetwork, ctn Constellat
return nil return nil
} }
if network == nil {
bc.defaultNetwork, err = NewDockerNetwork()
if err != nil {
log.Error("Failed to create Docker network", "err", err)
return nil
}
network = bc.defaultNetwork
}
bc.dockerNetworkName = network.Name()
bc.getFreeIPAddrs = network.GetFreeIPAddrs
normalOpts = append(normalOpts, DockerNetworkName(bc.dockerNetworkName))
faultyOpts = append(faultyOpts, DockerNetworkName(bc.dockerNetworkName))
totalNodes := numOfNormal + numOfFaulty totalNodes := numOfNormal + numOfFaulty
ips, err := bc.getFreeIPAddrs(totalNodes) ips, err := bc.dockerNetwork.GetFreeIPAddrs(totalNodes)
if err != nil { if err != nil {
log.Error("Failed to get free ip addresses", "err", err) log.Error("Failed to get free ip addresses", "err", err)
return nil return nil
@ -302,9 +271,7 @@ func NewDefaultQuorumBlockchainWithFaulty(network *DockerNetwork, ctn Constellat
type blockchain struct { type blockchain struct {
dockerClient *client.Client dockerClient *client.Client
defaultNetwork *DockerNetwork dockerNetwork *DockerNetwork
dockerNetworkName string
getFreeIPAddrs func(int) ([]net.IP, error)
genesisFile string genesisFile string
isQuorum bool isQuorum bool
validators []Ethereum validators []Ethereum
@ -399,11 +366,6 @@ func (bc *blockchain) Stop(force bool) error {
return err return err
} }
if bc.defaultNetwork != nil {
err := bc.defaultNetwork.Remove()
bc.defaultNetwork = nil
return err
}
return nil return nil
} }
@ -416,7 +378,7 @@ func (bc *blockchain) Validators() []Ethereum {
} }
func (bc *blockchain) CreateNodes(num int, options ...Option) (nodes []Ethereum, err error) { func (bc *blockchain) CreateNodes(num int, options ...Option) (nodes []Ethereum, err error) {
ips, err := bc.getFreeIPAddrs(num) ips, err := bc.dockerNetwork.GetFreeIPAddrs(num)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -434,7 +396,7 @@ func (bc *blockchain) CreateNodes(num int, options ...Option) (nodes []Ethereum,
opts = append(opts, HostDataDir(dataDir)) opts = append(opts, HostDataDir(dataDir))
opts = append(opts, HostWebSocketPort(freeport.GetPort())) opts = append(opts, HostWebSocketPort(freeport.GetPort()))
opts = append(opts, HostIP(ips[i])) opts = append(opts, HostIP(ips[i]))
opts = append(opts, DockerNetworkName(bc.dockerNetworkName)) opts = append(opts, DockerNetworkName(bc.dockerNetwork.Name()))
geth := NewEthereum( geth := NewEthereum(
bc.dockerClient, bc.dockerClient,
@ -456,7 +418,7 @@ func (bc *blockchain) CreateNodes(num int, options ...Option) (nodes []Ethereum,
// ---------------------------------------------------------------------------- // ----------------------------------------------------------------------------
func (bc *blockchain) addValidators(numOfValidators int) error { func (bc *blockchain) addValidators(numOfValidators int) error {
ips, err := bc.getFreeIPAddrs(numOfValidators) ips, err := bc.dockerNetwork.GetFreeIPAddrs(numOfValidators)
if err != nil { if err != nil {
return err return err
} }
@ -597,7 +559,11 @@ type ConstellationNetwork interface {
} }
func NewConstellationNetwork(network *DockerNetwork, numOfValidators int, options ...ConstellationOption) (ctn *constellationNetwork) { func NewConstellationNetwork(network *DockerNetwork, numOfValidators int, options ...ConstellationOption) (ctn *constellationNetwork) {
ctn = &constellationNetwork{opts: options} if network == nil {
log.Error("Docker network is required")
return nil
}
ctn = &constellationNetwork{dockerNetwork: network, opts: options}
var err error var err error
ctn.dockerClient, err = client.NewEnvClient() ctn.dockerClient, err = client.NewEnvClient()
@ -606,14 +572,7 @@ func NewConstellationNetwork(network *DockerNetwork, numOfValidators int, option
return nil return nil
} }
if network == nil { ctn.opts = append(ctn.opts, CTDockerNetworkName(ctn.dockerNetwork.Name()))
log.Error("Network is required")
return
}
ctn.dockerNetworkName = network.Name()
ctn.getFreeIPAddrs = network.GetFreeIPAddrs
ctn.opts = append(ctn.opts, CTDockerNetworkName(ctn.dockerNetworkName))
ctn.setupConstellations(numOfValidators) ctn.setupConstellations(numOfValidators)
return ctn return ctn
@ -685,7 +644,7 @@ func (ctn *constellationNetwork) GetConstellation(idx int) Constellation {
} }
func (ctn *constellationNetwork) getFreeHosts(num int) ([]net.IP, []int) { func (ctn *constellationNetwork) getFreeHosts(num int) ([]net.IP, []int) {
ips, err := ctn.getFreeIPAddrs(num) ips, err := ctn.dockerNetwork.GetFreeIPAddrs(num)
if err != nil { if err != nil {
log.Error("Cannot get free ip", "err", err) log.Error("Cannot get free ip", "err", err)
return nil, nil return nil, nil
@ -709,9 +668,8 @@ func (ctn *constellationNetwork) getOtherNodes(ips []net.IP, ports []int, idx in
} }
type constellationNetwork struct { type constellationNetwork struct {
dockerClient *client.Client dockerClient *client.Client
dockerNetworkName string dockerNetwork *DockerNetwork
getFreeIPAddrs func(int) ([]net.IP, error) opts []ConstellationOption
opts []ConstellationOption constellations []Constellation
constellations []Constellation
} }

View File

@ -22,8 +22,14 @@ import (
) )
func TestEthereumBlockchain(t *testing.T) { func TestEthereumBlockchain(t *testing.T) {
dockerNetwork, err := NewDockerNetwork()
if err != nil {
t.Error(err)
}
defer dockerNetwork.Remove()
chain := NewBlockchain( chain := NewBlockchain(
nil, dockerNetwork,
4, 4,
ImageRepository("quay.io/amis/geth"), ImageRepository("quay.io/amis/geth"),
ImageTag("istanbul_develop"), ImageTag("istanbul_develop"),
@ -34,11 +40,11 @@ func TestEthereumBlockchain(t *testing.T) {
WebSocketOrigin("*"), WebSocketOrigin("*"),
NoDiscover(), NoDiscover(),
Password("password.txt"), Password("password.txt"),
Logging(true), Logging(false),
) )
defer chain.Finalize() defer chain.Finalize()
err := chain.Start(true) err = chain.Start(true)
if err != nil { if err != nil {
t.Error(err) t.Error(err)
} }

View File

@ -48,7 +48,7 @@ func TestConstellationContainer(t *testing.T) {
CTHost(ip, port), CTHost(ip, port),
CTDockerNetworkName(dockerNetwork.Name()), CTDockerNetworkName(dockerNetwork.Name()),
CTWorkDir("/data"), CTWorkDir("/data"),
CTLogging(true), CTLogging(false),
CTKeyName("node"), CTKeyName("node"),
CTSocketFilename("node.ipc"), CTSocketFilename("node.ipc"),
CTVerbosity(3), CTVerbosity(3),

View File

@ -22,6 +22,7 @@ import (
"fmt" "fmt"
"net" "net"
"sync" "sync"
"time"
"github.com/docker/docker/api/types" "github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/network" "github.com/docker/docker/api/types/network"
@ -29,11 +30,35 @@ import (
) )
const ( const (
FirstOctet = 172 FirstOctet = 172
SecondOctet = 19 SecondOctet = 17
NetworkName = "testnet" networkNamePrefix = "testnet"
) )
type NetworkManager interface {
TryGetFreeSubnet() string
}
var defaultNetworkManager = newNetworkManager()
func newNetworkManager() *networkManager {
return &networkManager{
secondOctet: SecondOctet,
}
}
type networkManager struct {
mutex sync.Mutex
secondOctet int
}
func (n *networkManager) TryGetFreeSubnet() string {
n.mutex.Lock()
defer n.mutex.Unlock()
n.secondOctet++
return fmt.Sprintf("%d.%d.0.0/16", FirstOctet, n.secondOctet)
}
type DockerNetwork struct { type DockerNetwork struct {
client *client.Client client *client.Client
id string id string
@ -50,45 +75,53 @@ func NewDockerNetwork() (*DockerNetwork, error) {
return nil, err return nil, err
} }
for i := SecondOctet; i < 256; i++ { network := &DockerNetwork{
// IP xxx.xxx.0.1 is reserved for docker network gateway client: c,
ipv4Addr, ipv4Net, err := net.ParseCIDR(fmt.Sprintf("%d.%d.0.1/16", FirstOctet, i))
networkName := fmt.Sprintf("%s_%d_%d", NetworkName, FirstOctet, i)
if err != nil {
return nil, err
}
network := &DockerNetwork{
client: c,
name: networkName,
ipv4Net: ipv4Net,
ipIndex: ipv4Addr,
}
if err = network.create(); err != nil {
log.Error("Failed to create network, retry...", "err", err)
} else {
return network, nil
}
} }
return nil, err if err := network.create(); err != nil {
return nil, err
}
return network, nil
} }
// create creates a docker network with given subnet // create creates a user-defined docker network
func (n *DockerNetwork) create() error { func (n *DockerNetwork) create() error {
ipamConfig := network.IPAMConfig{ n.name = fmt.Sprintf("%s%d", networkNamePrefix, time.Now().Unix())
Subnet: n.ipv4Net.String(),
} var maxTryCount = 15
ipam := &network.IPAM{ var err error
Config: []network.IPAMConfig{ipamConfig}, var cResp types.NetworkCreateResponse
var subnet string
for i := 0; i < maxTryCount && n.id == ""; i++ {
subnet = defaultNetworkManager.TryGetFreeSubnet()
ipam := &network.IPAM{
Config: []network.IPAMConfig{
network.IPAMConfig{
Subnet: subnet,
},
},
}
cResp, err = n.client.NetworkCreate(context.Background(), n.name, types.NetworkCreate{
IPAM: ipam,
})
if err == nil {
break
}
} }
r, err := n.client.NetworkCreate(context.Background(), n.name, types.NetworkCreate{
IPAM: ipam,
})
if err != nil { if err != nil {
return err return err
} }
n.id = r.ID n.id = cResp.ID
_, n.ipv4Net, err = net.ParseCIDR(subnet)
if err != nil {
return err
}
// IP starts with xxx.xxx.0.1
// Because xxx.xxx.0.1 is reserved for default Gateway IP
n.ipIndex = net.IPv4(n.ipv4Net.IP[0], n.ipv4Net.IP[1], 0, 1)
return nil return nil
} }
@ -100,6 +133,10 @@ func (n *DockerNetwork) Name() string {
return n.name return n.name
} }
func (n *DockerNetwork) Subnet() string {
return n.ipv4Net.String()
}
func (n *DockerNetwork) Remove() error { func (n *DockerNetwork) Remove() error {
return n.client.NetworkRemove(context.Background(), n.id) return n.client.NetworkRemove(context.Background(), n.id)
} }
@ -109,7 +146,7 @@ func (n *DockerNetwork) GetFreeIPAddrs(num int) ([]net.IP, error) {
defer n.mutex.Unlock() defer n.mutex.Unlock()
ips := make([]net.IP, 0) ips := make([]net.IP, 0)
for i := 0; i < num; i++ { for len(ips) < num && n.ipv4Net.Contains(n.ipIndex) {
ip := dupIP(n.ipIndex) ip := dupIP(n.ipIndex)
for j := len(ip) - 1; j >= 0; j-- { for j := len(ip) - 1; j >= 0; j-- {
ip[j]++ ip[j]++
@ -117,12 +154,8 @@ func (n *DockerNetwork) GetFreeIPAddrs(num int) ([]net.IP, error) {
break break
} }
} }
if !n.ipv4Net.Contains(ip) {
break
}
ips = append(ips, ip)
n.ipIndex = ip n.ipIndex = ip
ips = append(ips, ip)
} }
if len(ips) != num { if len(ips) != num {