This commit is contained in:
StephenButtolph 2020-05-23 17:19:16 -04:00
commit 40cb65ed8f
13 changed files with 241 additions and 31 deletions

74
api/health/checks.go Normal file
View File

@ -0,0 +1,74 @@
// (c) 2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package health
import (
"errors"
"time"
)
var (
// ErrHeartbeatNotDetected is returned from a HeartbeatCheckFn when the
// heartbeat has not been detected recently enough
ErrHeartbeatNotDetected = errors.New("heartbeat not detected")
)
// CheckFn returns optional status information and an error indicating health or
// non-health
type CheckFn func() (interface{}, error)
// Check defines a single health check that we want to monitor and consider as
// part of our wider healthiness
type Check struct {
// Name is the identifier for this check and must be unique among all Checks
Name string
// CheckFn is the function to call to perform the the health check
CheckFn CheckFn
// ExecutionPeriod is the duration to wait between executions of this Check
ExecutionPeriod time.Duration
// InitialDelay is the duration to wait before executing the first time
InitialDelay time.Duration
// InitiallyPassing is whether or not to consider the Check healthy before the
// initial execution
InitiallyPassing bool
}
// gosundheitCheck implements the health.Check interface backed by a CheckFn
type gosundheitCheck struct {
name string
checkFn CheckFn
}
// Name implements the health.Check interface by returning a unique name
func (c gosundheitCheck) Name() string { return c.name }
// Execute implements the health.Check interface by executing the checkFn and
// returning the results
func (c gosundheitCheck) Execute() (interface{}, error) { return c.checkFn() }
// Heartbeater provides a getter to the most recently observed heartbeat
type Heartbeater interface {
GetHeartbeat() int64
}
// HeartbeatCheckFn returns a CheckFn that checks the given heartbeater has
// pulsed within the given duration
func HeartbeatCheckFn(hb Heartbeater, max time.Duration) CheckFn {
return func() (data interface{}, err error) {
// Get the heartbeat and create a data set to return to the caller
hb := hb.GetHeartbeat()
data = map[string]int64{"heartbeat": hb}
// If the current time is after the last known heartbeat + the limit then
// mark our check as failed
if time.Unix(hb, 0).Add(max).Before(time.Now()) {
err = ErrHeartbeatNotDetected
}
return data, err
}
}

80
api/health/service.go Normal file
View File

@ -0,0 +1,80 @@
// (c) 2020, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package health
import (
"net/http"
"time"
"github.com/AppsFlyer/go-sundheit"
"github.com/ava-labs/gecko/snow/engine/common"
"github.com/ava-labs/gecko/utils/json"
"github.com/ava-labs/gecko/utils/logging"
"github.com/gorilla/rpc/v2"
)
// defaultCheckOpts is a Check whose properties represent a default Check
var defaultCheckOpts = Check{ExecutionPeriod: time.Minute}
// Health observes a set of vital signs and makes them available through an HTTP
// API.
type Health struct {
log logging.Logger
health health.Health
}
// NewService creates a new Health service
func NewService(log logging.Logger) *Health {
return &Health{log, health.New()}
}
// Handler returns an HTTPHandler providing RPC access to the Health service
func (h *Health) Handler() *common.HTTPHandler {
newServer := rpc.NewServer()
codec := json.NewCodec()
newServer.RegisterCodec(codec, "application/json")
newServer.RegisterCodec(codec, "application/json;charset=UTF-8")
newServer.RegisterService(h, "health")
return &common.HTTPHandler{LockOptions: common.NoLock, Handler: newServer}
}
// RegisterHeartbeat adds a check with default options and a CheckFn that checks
// the given heartbeater for a recent heartbeat
func (h *Health) RegisterHeartbeat(name string, hb Heartbeater, max time.Duration) error {
return h.RegisterCheckFunc(name, HeartbeatCheckFn(hb, max))
}
// RegisterCheckFunc adds a Check with default options and the given CheckFn
func (h *Health) RegisterCheckFunc(name string, checkFn CheckFn) error {
check := defaultCheckOpts
check.Name = name
check.CheckFn = checkFn
return h.RegisterCheck(check)
}
// RegisterCheck adds the given Check
func (h *Health) RegisterCheck(c Check) error {
return h.health.RegisterCheck(&health.Config{
InitialDelay: c.InitialDelay,
ExecutionPeriod: c.ExecutionPeriod,
InitiallyPassing: c.InitiallyPassing,
Check: gosundheitCheck{c.Name, c.CheckFn},
})
}
// GetLivenessArgs are the arguments for GetLiveness
type GetLivenessArgs struct{}
// GetLivenessReply is the response for GetLiveness
type GetLivenessReply struct {
Checks map[string]health.Result `json:"checks"`
Healthy bool `json:"healthy"`
}
// GetLiveness returns a summation of the health of the node
func (h *Health) GetLiveness(_ *http.Request, _ *GetLivenessArgs, reply *GetLivenessReply) error {
h.log.Debug("Health: GetLiveness called")
reply.Checks, reply.Healthy = h.health.Results()
return nil
}

View File

@ -218,7 +218,7 @@ func (ks *Keystore) ExportUser(_ *http.Request, args *ExportUserArgs, reply *Exp
return err
}
if !usr.CheckPassword(args.Password) {
return fmt.Errorf("incorrect password for %s", args.Username)
return fmt.Errorf("incorrect password for user %q", args.Username)
}
userDB := prefixdb.New([]byte(args.Username), ks.bcDB)
@ -274,6 +274,9 @@ func (ks *Keystore) ImportUser(r *http.Request, args *ImportUserArgs, reply *Imp
if err := ks.codec.Unmarshal(args.User.Bytes, &userData); err != nil {
return err
}
if !userData.User.CheckPassword(args.Password) {
return fmt.Errorf("incorrect password for user %q", args.Username)
}
usrBytes, err := ks.codec.Marshal(&userData.User)
if err != nil {
@ -383,7 +386,7 @@ func (ks *Keystore) GetDatabase(bID ids.ID, username, password string) (database
return nil, err
}
if !usr.CheckPassword(password) {
return nil, fmt.Errorf("incorrect password for user '%s'", username)
return nil, fmt.Errorf("incorrect password for user %q", username)
}
userDB := prefixdb.New([]byte(username), ks.bcDB)

View File

@ -255,6 +255,17 @@ func TestServiceExportImport(t *testing.T) {
newKS := Keystore{}
newKS.Initialize(logging.NoLog{}, memdb.New())
{
reply := ImportUserReply{}
if err := newKS.ImportUser(nil, &ImportUserArgs{
Username: "bob",
Password: "",
User: exportReply.User,
}, &reply); err == nil {
t.Fatal("Should have errored due to incorrect password")
}
}
{
reply := ImportUserReply{}
if err := newKS.ImportUser(nil, &ImportUserArgs{

View File

@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"net"
"net/http"
"net/url"
"sync"
@ -28,30 +29,40 @@ var (
// Server maintains the HTTP router
type Server struct {
log logging.Logger
factory logging.Factory
router *router
portURL string
log logging.Logger
factory logging.Factory
router *router
listenAddress string
}
// Initialize creates the API server at the provided port
func (s *Server) Initialize(log logging.Logger, factory logging.Factory, port uint16) {
// Initialize creates the API server at the provided host and port
func (s *Server) Initialize(log logging.Logger, factory logging.Factory, host string, port uint16) {
s.log = log
s.factory = factory
s.portURL = fmt.Sprintf(":%d", port)
s.listenAddress = fmt.Sprintf("%s:%d", host, port)
s.router = newRouter()
}
// Dispatch starts the API server
func (s *Server) Dispatch() error {
handler := cors.Default().Handler(s.router)
return http.ListenAndServe(s.portURL, handler)
listener, err := net.Listen("tcp", s.listenAddress)
if err != nil {
return err
}
s.log.Info("API server listening on %q", s.listenAddress)
return http.Serve(listener, handler)
}
// DispatchTLS starts the API server with the provided TLS certificate
func (s *Server) DispatchTLS(certFile, keyFile string) error {
handler := cors.Default().Handler(s.router)
return http.ListenAndServeTLS(s.portURL, certFile, keyFile, handler)
listener, err := net.Listen("tcp", s.listenAddress)
if err != nil {
return err
}
s.log.Info("API server listening on %q", s.listenAddress)
return http.ServeTLS(listener, handler, certFile, keyFile)
}
// RegisterChain registers the API endpoints associated with this chain That

View File

@ -30,7 +30,7 @@ func (s *Service) Call(_ *http.Request, args *Args, reply *Reply) error {
func TestCall(t *testing.T) {
s := Server{}
s.Initialize(logging.NoLog{}, logging.NoFactory{}, 8080)
s.Initialize(logging.NoLog{}, logging.NoFactory{}, "localhost", 8080)
serv := &Service{}
newServer := rpc.NewServer()

View File

@ -93,6 +93,7 @@ func init() {
consensusIP := fs.String("public-ip", "", "Public IP of this node")
// HTTP Server:
httpHost := fs.String("http-host", "", "Address of the HTTP server")
httpPort := fs.Uint("http-port", 9650, "Port of the HTTP server")
fs.BoolVar(&Config.EnableHTTPS, "http-tls-enabled", false, "Upgrade the HTTP server to HTTPs")
fs.StringVar(&Config.HTTPSKeyFile, "http-tls-key-file", "", "TLS private key file for the HTTPs server")
@ -128,6 +129,7 @@ func init() {
fs.BoolVar(&Config.AdminAPIEnabled, "api-admin-enabled", true, "If true, this node exposes the Admin API")
fs.BoolVar(&Config.KeystoreAPIEnabled, "api-keystore-enabled", true, "If true, this node exposes the Keystore API")
fs.BoolVar(&Config.MetricsAPIEnabled, "api-metrics-enabled", true, "If true, this node exposes the Metrics API")
fs.BoolVar(&Config.HealthAPIEnabled, "api-health-enabled", true, "If true, this node exposes the Health API")
fs.BoolVar(&Config.IPCEnabled, "api-ipcs-enabled", false, "If true, IPCs can be opened")
// Throughput Server
@ -269,6 +271,7 @@ func init() {
}
// HTTP:
Config.HTTPHost = *httpHost
Config.HTTPPort = uint16(*httpPort)
// Logging:

View File

@ -8,8 +8,10 @@ import (
"math"
"net"
"sync"
"sync/atomic"
"time"
"github.com/ava-labs/gecko/api/health"
"github.com/ava-labs/gecko/ids"
"github.com/ava-labs/gecko/snow/networking/router"
"github.com/ava-labs/gecko/snow/networking/sender"
@ -46,6 +48,10 @@ type Network interface {
// Thread safety must be managed internally in the network.
triggers.Acceptor
// The network should be able to report the last time the network interacted
// with a peer
health.Heartbeater
// Should only be called once, will run until either a fatal error occurs,
// or the network is closed. Returns a non-nil error.
Dispatch() error
@ -85,7 +91,8 @@ type network struct {
vdrs validators.Set // set of current validators in the AVAnet
router router.Router // router must be thread safe
clock timer.Clock
clock timer.Clock
lastHeartbeat int64
initialReconnectDelay time.Duration
maxReconnectDelay time.Duration
@ -210,6 +217,7 @@ func NewNetwork(
peers: make(map[[20]byte]*peer),
}
net.executor.Initialize()
net.heartbeat()
return net
}
@ -419,6 +427,12 @@ func (n *network) Accept(chainID, containerID ids.ID, container []byte) error {
return n.gossipContainer(chainID, containerID, container)
}
// heartbeat registers a new heartbeat to signal liveness
func (n *network) heartbeat() { atomic.StoreInt64(&n.lastHeartbeat, n.clock.Time().Unix()) }
// GetHeartbeat returns the most recent heartbeat time
func (n *network) GetHeartbeat() int64 { return atomic.LoadInt64(&n.lastHeartbeat) }
// Dispatch starts accepting connections from other nodes attempting to connect
// to this node.
func (n *network) Dispatch() error {

View File

@ -188,6 +188,8 @@ func (p *peer) send(msg Msg) bool {
// assumes the stateLock is not held
func (p *peer) handle(msg Msg) {
p.net.heartbeat()
op := msg.Op()
switch op {
case Version:

View File

@ -42,6 +42,7 @@ type Config struct {
BootstrapPeers []*Peer
// HTTP configuration
HTTPHost string
HTTPPort uint16
EnableHTTPS bool
HTTPSKeyFile string
@ -51,6 +52,7 @@ type Config struct {
AdminAPIEnabled bool
KeystoreAPIEnabled bool
MetricsAPIEnabled bool
HealthAPIEnabled bool
// Logging configuration
LoggingConfig logging.Config

View File

@ -13,9 +13,11 @@ import (
"os"
"path"
"sync"
"time"
"github.com/ava-labs/gecko/api"
"github.com/ava-labs/gecko/api/admin"
"github.com/ava-labs/gecko/api/health"
"github.com/ava-labs/gecko/api/ipcs"
"github.com/ava-labs/gecko/api/keystore"
"github.com/ava-labs/gecko/api/metrics"
@ -384,7 +386,7 @@ func (n *Node) initChains() error {
func (n *Node) initAPIServer() {
n.Log.Info("Initializing API server")
n.APIServer.Initialize(n.Log, n.LogFactory, n.Config.HTTPPort)
n.APIServer.Initialize(n.Log, n.LogFactory, n.Config.HTTPHost, n.Config.HTTPPort)
go n.Log.RecoverAndPanic(func() {
if n.Config.EnableHTTPS {
@ -465,6 +467,19 @@ func (n *Node) initAdminAPI() {
}
}
// initHealthAPI initializes the Health API service
// Assumes n.Log, n.ConsensusAPI, and n.ValidatorAPI already initialized
func (n *Node) initHealthAPI() {
if !n.Config.HealthAPIEnabled {
return
}
n.Log.Info("initializing Health API")
service := health.NewService(n.Log)
service.RegisterHeartbeat("network.validators.heartbeat", n.Net, 5*time.Minute)
n.APIServer.AddRoute(service.Handler(), &sync.RWMutex{}, "health", "", n.HTTPLog)
}
// initIPCAPI initializes the IPC API service
// Assumes n.log and n.chainManager already initialized
func (n *Node) initIPCAPI() {
@ -546,8 +561,9 @@ func (n *Node) Initialize(Config *Config, logger logging.Logger, logFactory logg
n.initEventDispatcher() // Set up the event dipatcher
n.initChainManager() // Set up the chain manager
n.initAdminAPI() // Start the Admin API
n.initIPCAPI() // Start the IPC API
n.initAdminAPI() // Start the Admin API
n.initHealthAPI() // Start the Health API
n.initIPCAPI() // Start the IPC API
if err := n.initAliases(); err != nil { // Set up aliases
return err

View File

@ -16,10 +16,11 @@ import (
"github.com/ava-labs/gecko/utils/formatting"
"github.com/ava-labs/gecko/utils/hashing"
"github.com/ava-labs/gecko/utils/json"
safemath "github.com/ava-labs/gecko/utils/math"
"github.com/ava-labs/gecko/vms/components/ava"
"github.com/ava-labs/gecko/vms/components/verify"
"github.com/ava-labs/gecko/vms/secp256k1fx"
safemath "github.com/ava-labs/gecko/utils/math"
)
var (
@ -37,6 +38,7 @@ var (
errUnknownOutputType = errors.New("unknown output type")
errUnneededAddress = errors.New("address not required to sign")
errUnknownCredentialType = errors.New("unknown credential type")
errNilTxID = errors.New("nil transaction ID")
)
// Service defines the base service for the asset vm
@ -75,10 +77,6 @@ type GetTxStatusReply struct {
Status choices.Status `json:"status"`
}
var (
errNilTxID = errors.New("nil transaction ID")
)
// GetTxStatus returns the status of the specified transaction
func (service *Service) GetTxStatus(r *http.Request, args *GetTxStatusArgs, reply *GetTxStatusReply) error {
service.vm.ctx.Log.Verbo("GetTxStatus called with %s", args.TxID)
@ -247,6 +245,7 @@ func (service *Service) GetBalance(r *http.Request, args *GetBalanceArgs, reply
return err
}
reply.UTXOIDs = make([]ava.UTXOID, 0, len(utxos))
for _, utxo := range utxos {
if !utxo.AssetID().Equals(assetID) {
continue

View File

@ -36,7 +36,7 @@ type AtomicTx interface {
// AtomicBlock being accepted results in the transaction contained in the
// block to be accepted and committed to the chain.
type AtomicBlock struct {
SingleDecisionBlock `serialize:"true"`
CommonDecisionBlock `serialize:"true"`
Tx AtomicTx `serialize:"true"`
@ -45,7 +45,7 @@ type AtomicBlock struct {
// initialize this block
func (ab *AtomicBlock) initialize(vm *VM, bytes []byte) error {
if err := ab.SingleDecisionBlock.initialize(vm, bytes); err != nil {
if err := ab.CommonDecisionBlock.initialize(vm, bytes); err != nil {
return err
}
return ab.Tx.initialize(vm)
@ -123,9 +123,6 @@ func (ab *AtomicBlock) Accept() {
ab.onAcceptFunc()
}
parent := ab.parentBlock()
// remove this block and its parent from memory
parent.free()
ab.free()
}
@ -133,11 +130,9 @@ func (ab *AtomicBlock) Accept() {
// decision block, has ID [parentID].
func (vm *VM) newAtomicBlock(parentID ids.ID, tx AtomicTx) (*AtomicBlock, error) {
ab := &AtomicBlock{
SingleDecisionBlock: SingleDecisionBlock{CommonDecisionBlock: CommonDecisionBlock{
CommonBlock: CommonBlock{
Block: core.NewBlock(parentID),
vm: vm,
},
CommonDecisionBlock: CommonDecisionBlock{CommonBlock: CommonBlock{
Block: core.NewBlock(parentID),
vm: vm,
}},
Tx: tx,
}