From c13eaed984ae3071d960e69ac8af77605af1f540 Mon Sep 17 00:00:00 2001 From: Alan Chen Date: Wed, 6 Sep 2017 15:59:12 +0800 Subject: [PATCH] k8s: add initial k8s ethereum implementation --- k8s/blockchain.go | 155 +++++++++++++++++++++++++++++++++++++ k8s/blockchain_example.go | 46 +++++++++++ k8s/ethereum.go | 156 ++++++++++++++++++++++++++++++++++++++ k8s/ethereum_example.go | 64 ++++++++++++++++ k8s/option.go | 78 +++++++++++++++++++ k8s/utils.go | 86 +++++++++++++++++++++ 6 files changed, 585 insertions(+) create mode 100644 k8s/blockchain.go create mode 100644 k8s/blockchain_example.go create mode 100644 k8s/ethereum.go create mode 100644 k8s/ethereum_example.go create mode 100644 k8s/option.go create mode 100644 k8s/utils.go diff --git a/k8s/blockchain.go b/k8s/blockchain.go new file mode 100644 index 00000000..40b3a9b5 --- /dev/null +++ b/k8s/blockchain.go @@ -0,0 +1,155 @@ +// Copyright 2017 AMIS Technologies +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package k8s + +import ( + "errors" + "fmt" + "log" + "time" + + "github.com/getamis/istanbul-tools/charts" + istcommon "github.com/getamis/istanbul-tools/common" + "github.com/getamis/istanbul-tools/container" +) + +func NewBlockchain(numOfValidators int, gaslimit uint64, options ...Option) (bc *blockchain) { + _, nodekeys, addrs := istcommon.GenerateKeys(numOfValidators) + ips := istcommon.GenerateIPs(len(nodekeys)) + + bc = &blockchain{ + genesis: charts.NewGenesisChart(addrs, uint64(gaslimit)), + staticNodes: charts.NewStaticNodesChart(nodekeys, ips), + } + + if err := bc.genesis.Install(false); err != nil { + log.Println(err) + return nil + } + if err := bc.staticNodes.Install(false); err != nil { + log.Println(err) + bc.genesis.Uninstall() + return nil + } + bc.setupValidators(numOfValidators, nodekeys, ips, options...) + return bc +} + +// ---------------------------------------------------------------------------- + +type blockchain struct { + genesis *charts.GenesisChart + staticNodes *charts.StaticNodesChart + validators []container.Ethereum +} + +func (bc *blockchain) EnsureConsensusWorking(geths []container.Ethereum, t time.Duration) error { + errCh := make(chan error, len(geths)) + quitCh := make(chan struct{}, len(geths)) + for _, geth := range geths { + go geth.ConsensusMonitor(errCh, quitCh) + } + + timeout := time.NewTimer(t) + defer timeout.Stop() + + var err error + select { + case err = <-errCh: + case <-timeout.C: + for i := 0; i < len(geths); i++ { + quitCh <- struct{}{} + } + } + return err +} + +func (bc *blockchain) AddValidators(numOfValidators int) ([]container.Ethereum, error) { + return nil, errors.New("Unsupported") +} + +func (bc *blockchain) RemoveValidators(candidates []container.Ethereum, processingTime time.Duration) error { + return errors.New("Unsupported") +} + +func (bc *blockchain) Start(strong bool) error { + return bc.start(bc.validators) +} + +func (bc *blockchain) Stop(force bool) error { + return bc.stop(bc.validators, force) +} + +func (bc *blockchain) Finalize() { + for _, v := range bc.validators { + v.Stop() + } + + bc.staticNodes.Uninstall() + bc.genesis.Uninstall() +} + +func (bc *blockchain) Validators() []container.Ethereum { + return bc.validators +} + +func (bc *blockchain) CreateNodes(num int, options ...Option) (nodes []container.Ethereum, err error) { + return nil, errors.New("Unsupported") +} + +// ---------------------------------------------------------------------------- + +func (bc *blockchain) setupValidators(num int, nodekeys []string, ips []string, options ...Option) { + for i := 0; i < num; i++ { + var opts []Option + opts = append(opts, options...) + + opts = append(opts, Name(fmt.Sprintf("%d", i))) + opts = append(opts, NodeKeyHex(nodekeys[i])) + opts = append(opts, IPAddress(ips[i])) + + geth := NewEthereum( + opts..., + ) + + bc.validators = append(bc.validators, geth) + } +} + +func (bc *blockchain) start(validators []container.Ethereum) error { + var fns []func() error + + for _, v := range validators { + geth := v + fns = append(fns, func() error { + return geth.Start() + }) + } + return executeInParallel(fns...) +} + +func (bc *blockchain) stop(validators []container.Ethereum, force bool) error { + var fns []func() error + + for _, v := range validators { + geth := v + fns = append(fns, func() error { + return geth.Stop() + }) + } + return executeInParallel(fns...) +} diff --git a/k8s/blockchain_example.go b/k8s/blockchain_example.go new file mode 100644 index 00000000..e7258653 --- /dev/null +++ b/k8s/blockchain_example.go @@ -0,0 +1,46 @@ +// Copyright 2017 AMIS Technologies +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package k8s + +import ( + "testing" + "time" +) + +func ExampleK8SBlockchain(t *testing.T) { + chain := NewBlockchain( + 4, + 21000*1000, + ImageRepository("quay.io/amis/geth"), + ImageTag("istanbul_develop"), + ServiceType("LoadBalancer"), + Mine(), + ) + defer chain.Finalize() + + err := chain.Start(true) + if err != nil { + t.Error(err) + } + + <-time.After(20 * time.Second) + + err = chain.Stop(false) + if err != nil { + t.Error(err) + } +} diff --git a/k8s/ethereum.go b/k8s/ethereum.go new file mode 100644 index 00000000..5f5a0e6e --- /dev/null +++ b/k8s/ethereum.go @@ -0,0 +1,156 @@ +// Copyright 2017 AMIS Technologies +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package k8s + +import ( + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + + "github.com/ethereum/go-ethereum/accounts" + "github.com/ethereum/go-ethereum/common" + + "github.com/getamis/istanbul-tools/charts" + "github.com/getamis/istanbul-tools/client" + istcommon "github.com/getamis/istanbul-tools/common" +) + +func NewEthereum(options ...Option) *ethereum { + eth := ðereum{ + name: istcommon.RandomHex(), + } + + for _, opt := range options { + opt(eth) + } + + eth.chart = charts.NewValidatorChart(eth.name, eth.args) + + return eth +} + +type ethereum struct { + chart *charts.ValidatorChart + name string + args []string + + k8sClient *kubernetes.Clientset +} + +func (eth *ethereum) Init(genesisFile string) error { + return nil +} + +func (eth *ethereum) Start() error { + if err := eth.chart.Install(false); err != nil { + return err + } + + eth.k8sClient = k8sClient(eth.chart.Name() + "-0") + return nil +} + +func (eth *ethereum) Stop() error { + return eth.chart.Uninstall() +} + +func (eth *ethereum) Wait(t time.Duration) error { + return nil +} + +func (eth *ethereum) Running() bool { + return false +} + +func (eth *ethereum) ContainerID() string { + return "" +} + +func (eth *ethereum) DockerEnv() []string { + return nil +} + +func (eth *ethereum) DockerBinds() []string { + return nil +} + +func (eth *ethereum) NewClient() *client.Client { + client, err := client.Dial("ws://" + eth.Host() + ":8545") + if err != nil { + return nil + } + return client +} + +func (eth *ethereum) NodeAddress() string { + return "" +} + +func (eth *ethereum) Address() common.Address { + return common.Address{} +} + +func (eth *ethereum) ConsensusMonitor(errCh chan<- error, quit chan struct{}) { + +} + +func (eth *ethereum) WaitForProposed(expectedAddress common.Address, timeout time.Duration) error { + return nil +} + +func (eth *ethereum) WaitForPeersConnected(expectedPeercount int) error { + return nil +} + +func (eth *ethereum) WaitForBlocks(num int, waitingTime ...time.Duration) error { + return nil +} + +func (eth *ethereum) WaitForBlockHeight(num int) error { + return nil +} + +func (eth *ethereum) WaitForNoBlocks(num int, duration time.Duration) error { + return nil +} + +func (eth *ethereum) AddPeer(address string) error { + return nil +} + +func (eth *ethereum) StartMining() error { + return nil +} + +func (eth *ethereum) StopMining() error { + return nil +} + +func (eth *ethereum) Accounts() []accounts.Account { + return nil +} + +// ---------------------------------------------------------------------------- + +func (eth *ethereum) Host() string { + svc, err := eth.k8sClient.CoreV1().Services(defaultNamespace).Get(eth.chart.Name()+"-0", metav1.GetOptions{}) + if err != nil { + return "" + } + return svc.Spec.LoadBalancerIP +} diff --git a/k8s/ethereum_example.go b/k8s/ethereum_example.go new file mode 100644 index 00000000..5dbdd895 --- /dev/null +++ b/k8s/ethereum_example.go @@ -0,0 +1,64 @@ +// Copyright 2017 AMIS Technologies +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package k8s + +import ( + "fmt" + + "github.com/getamis/istanbul-tools/charts" + "github.com/getamis/istanbul-tools/common" + "github.com/getamis/istanbul-tools/genesis" +) + +func ExampleK8SEthereum() { + _, nodekeys, addrs := common.GenerateKeys(1) + genesisChart := charts.NewGenesisChart(addrs, genesis.InitGasLimit) + if err := genesisChart.Install(false); err != nil { + fmt.Println(err) + return + } + defer genesisChart.Uninstall() + + staticNodesChart := charts.NewStaticNodesChart(nodekeys, common.GenerateIPs(len(nodekeys))) + if err := staticNodesChart.Install(false); err != nil { + fmt.Println(err) + return + } + defer staticNodesChart.Uninstall() + + geth := NewEthereum( + ImageRepository("quay.io/amis/geth"), + ImageTag("istanbul_develop"), + + Name("test"), + ServiceType("LoadBalancer"), + IPAddress("10.0.1.100"), + NodeKeyHex(common.RandomHex()[2:]), + ) + + err := geth.Start() + if err != nil { + fmt.Println(err) + return + } + + err = geth.Stop() + if err != nil { + fmt.Println(err) + return + } +} diff --git a/k8s/option.go b/k8s/option.go new file mode 100644 index 00000000..933b8992 --- /dev/null +++ b/k8s/option.go @@ -0,0 +1,78 @@ +// Copyright 2017 AMIS Technologies +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package k8s + +import "fmt" + +type Option func(*ethereum) + +func ImageRepository(repository string) Option { + return func(eth *ethereum) { + eth.args = append(eth.args, fmt.Sprintf("image.respository=%s", repository)) + } +} + +func ImageTag(tag string) Option { + return func(eth *ethereum) { + eth.args = append(eth.args, fmt.Sprintf("image.tag=%s", tag)) + } +} + +// ---------------------------------------------------------------------------- + +func Name(name string) Option { + return func(eth *ethereum) { + eth.name = name + eth.args = append(eth.args, fmt.Sprintf("nameOverride=%s", name)) + } +} + +func ServiceType(serviceType string) Option { + return func(eth *ethereum) { + eth.args = append(eth.args, fmt.Sprintf("service.type=%s", serviceType)) + } +} + +func IPAddress(ip string) Option { + return func(eth *ethereum) { + eth.args = append(eth.args, fmt.Sprintf("service.staticIP=%s", ip)) + } +} + +func NetworkID(networkID string) Option { + return func(eth *ethereum) { + eth.args = append(eth.args, fmt.Sprintf("ethereum.networkID=%s", networkID)) + } +} + +func Mine() Option { + return func(eth *ethereum) { + eth.args = append(eth.args, "ethereum.mining.enabled=true") + } +} + +func NodeKeyHex(hex string) Option { + return func(eth *ethereum) { + eth.args = append(eth.args, fmt.Sprintf("ethereum.nodekey.hex=%s", hex)) + } +} + +func Verbosity(verbosity int) Option { + return func(eth *ethereum) { + eth.args = append(eth.args, fmt.Sprintf("ethereum.verbosity=%d", verbosity)) + } +} diff --git a/k8s/utils.go b/k8s/utils.go new file mode 100644 index 00000000..dc644bd2 --- /dev/null +++ b/k8s/utils.go @@ -0,0 +1,86 @@ +// Copyright 2017 AMIS Technologies +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package k8s + +import ( + "log" + "os" + "path/filepath" + "sync" + "time" + + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/tools/clientcmd" +) + +const ( + defaultNamespace = "default" + + healthCheckRetryCount = 5 + healthCheckRetryDelay = 5 * time.Second +) + +func k8sClient(podName string) *kubernetes.Clientset { + config, err := clientcmd.BuildConfigFromFlags("", filepath.Join(os.Getenv("HOME"), ".kube", "config")) + if err != nil { + log.Fatalln(err) + } + + for i := 0; i < healthCheckRetryCount; i++ { + client, err := kubernetes.NewForConfig(config) + if err != nil { + log.Println(err) + <-time.After(healthCheckRetryDelay) + continue + } + _, err = client.CoreV1().Pods(defaultNamespace).Get(podName, metav1.GetOptions{}) + if err != nil { + log.Println(err) + <-time.After(healthCheckRetryDelay) + continue + } else { + return client + } + } + + log.Fatalln("Failed to retrieve kubernetes client") + return nil +} + +func executeInParallel(fns ...func() error) error { + var wg sync.WaitGroup + errc := make(chan error, len(fns)) + wg.Add(len(fns)) + + for _, fn := range fns { + fn := fn + go func() { + defer wg.Done() + errc <- fn() + }() + } + // Wait for the first error, then terminate the others. + var err error + for i := 0; i < len(fns); i++ { + if err = <-errc; err != nil { + return err + } + } + wg.Wait() + return err +}