From 85874a3765e84d28833f338f399bc4517cb3e77a Mon Sep 17 00:00:00 2001 From: Ethan Buchman Date: Wed, 27 Jan 2016 00:27:24 -0500 Subject: [PATCH] comments; fix locks --- handlers/callbacks.go | 46 ++++++-------- handlers/handlers.go | 44 ++++++++++---- types/chain.go | 137 +++++++++++++++++++++++------------------- types/val.go | 57 +++++++++++++----- 4 files changed, 168 insertions(+), 116 deletions(-) diff --git a/handlers/callbacks.go b/handlers/callbacks.go index ef88371b..d9ccb520 100644 --- a/handlers/callbacks.go +++ b/handlers/callbacks.go @@ -4,50 +4,38 @@ import ( "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-event-meter" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-events" + "github.com/tendermint/netmon/types" + tmtypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/types" ) +/* + Each chain-validator gets an eventmeter which maintains the websocket + Certain pre-defined events may update the netmon state: latency pongs, new blocks + TODO: config changes for new validators and changing ip/port +*/ + // implements eventmeter.EventCallbackFunc -func (tn *TendermintNetwork) newBlockCallback(chainID, valID string) eventmeter.EventCallbackFunc { +// updates validator and possibly chain with new block +func (tn *TendermintNetwork) newBlockCallback(chainState *types.ChainState, val *types.ValidatorState) eventmeter.EventCallbackFunc { return func(metric *eventmeter.EventMetric, data events.EventData) { block := data.(tmtypes.EventDataNewBlock).Block - tn.mtx.Lock() - defer tn.mtx.Unlock() - - // grab chain and validator - chain := tn.Chains[chainID] - val, _ := chain.Config.GetValidatorByID(valID) + // these functions are thread safe + // we should run them concurrently // update height for validator - val.Status.BlockHeight = block.Header.Height + val.NewBlock(block) // possibly update height and mean block time for chain - if block.Header.Height > chain.Status.Height { - chain.Status.NewBlock(block) - } - + chainState.NewBlock(block) } } // implements eventmeter.EventLatencyFunc -func (tn *TendermintNetwork) latencyCallback(chainID, valID string) eventmeter.LatencyCallbackFunc { +func (tn *TendermintNetwork) latencyCallback(chain *types.ChainState, val *types.ValidatorState) eventmeter.LatencyCallbackFunc { return func(latency float64) { - tn.mtx.Lock() - defer tn.mtx.Unlock() - - // grab chain and validator - chain := tn.Chains[chainID] - val, _ := chain.Config.GetValidatorByID(valID) - - // update latency for this validator and avg latency for chain - mean := chain.Status.MeanLatency * float64(chain.Status.NumValidators) - mean = (mean - val.Status.Latency + latency) / float64(chain.Status.NumValidators) - val.Status.Latency = latency - chain.Status.MeanLatency = mean - - // TODO: possibly update active nodes and uptime for chain - chain.Status.ActiveValidators = chain.Status.NumValidators // XXX - + oldLatency := val.UpdateLatency(latency) + chain.UpdateLatency(oldLatency, latency) } } diff --git a/handlers/handlers.go b/handlers/handlers.go index f774355f..08392a8a 100644 --- a/handlers/handlers.go +++ b/handlers/handlers.go @@ -20,8 +20,9 @@ var _ = wire.RegisterInterface( struct{ NetMonResult }{}, wire.ConcreteType{&types.ChainAndValidatorSetIDs{}, 0x01}, wire.ConcreteType{&types.ChainState{}, 0x02}, - wire.ConcreteType{&types.Validator{}, 0x03}, - wire.ConcreteType{&eventmeter.EventMetric{}, 0x04}, + wire.ConcreteType{&types.ValidatorSet{}, 0x10}, + wire.ConcreteType{&types.Validator{}, 0x11}, + wire.ConcreteType{&eventmeter.EventMetric{}, 0x20}, ) //--------------------------------------------- @@ -33,15 +34,11 @@ type TendermintNetwork struct { ValSets map[string]*types.ValidatorSet `json:"validator_sets"` } -// TODO: populate validator sets -func NewTendermintNetwork(chains ...*types.ChainState) *TendermintNetwork { +func NewTendermintNetwork() *TendermintNetwork { network := &TendermintNetwork{ Chains: make(map[string]*types.ChainState), ValSets: make(map[string]*types.ValidatorSet), } - for _, chain := range chains { - network.Chains[chain.Config.ID] = chain - } return network } @@ -58,8 +55,12 @@ func (tn *TendermintNetwork) Stop() { } } -//------------ +//----------------------------------------------------------- // RPC funcs +//----------------------------------------------------------- + +//------------------ +// Status // Returns sorted lists of all chains and validator sets func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorSetIDs, error) { @@ -86,6 +87,11 @@ func (tn *TendermintNetwork) Status() (*types.ChainAndValidatorSetIDs, error) { } +// NOTE: returned values should not be manipulated by callers as they are pointers to the state! +//------------------ +// Blockchains + +// Get the current state of a chain func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error) { tn.mtx.Lock() defer tn.mtx.Unlock() @@ -96,9 +102,10 @@ func (tn *TendermintNetwork) GetChain(chainID string) (*types.ChainState, error) return chain, nil } +// Register a new chain on the network. +// For each validator, start a websocket connection to listen for new block events and record latency func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) (*types.ChainState, error) { - tn.mtx.Lock() - defer tn.mtx.Unlock() + // Don't bother locking until we touch the TendermintNetwork object chainState := &types.ChainState{ Config: chainConfig, @@ -106,6 +113,7 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) } chainState.Status.NumValidators = len(chainConfig.Validators) + // so we can easily lookup validators by id rather than index chainState.Config.PopulateValIDMap() // start the event meter and listen for new blocks on each validator @@ -115,19 +123,27 @@ func (tn *TendermintNetwork) RegisterChain(chainConfig *types.BlockchainConfig) if err := v.Start(); err != nil { return nil, err } - v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainConfig.ID, v.Config.Validator.ID)) - err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainConfig.ID, v.Config.Validator.ID)) + + v.EventMeter().RegisterLatencyCallback(tn.latencyCallback(chainState, v)) + err := v.EventMeter().Subscribe(tmtypes.EventStringNewBlock(), tn.newBlockCallback(chainState, v)) if err != nil { return nil, err } // get/set the validator's pub key + // TODO: possibly remove? why should we depend on this here? v.PubKey() } + + tn.mtx.Lock() + defer tn.mtx.Unlock() tn.Chains[chainState.Config.ID] = chainState return chainState, nil } +//------------------ +// Validators + func (tn *TendermintNetwork) GetValidatorSet(valSetID string) (*types.ValidatorSet, error) { tn.mtx.Lock() defer tn.mtx.Unlock() @@ -159,6 +175,9 @@ func (tn *TendermintNetwork) GetValidator(valSetID, valID string) (*types.Valida return val, nil } +//------------------ +// Event metering + func (tn *TendermintNetwork) StartMeter(chainID, valID, eventID string) error { tn.mtx.Lock() defer tn.mtx.Unlock() @@ -190,6 +209,7 @@ func (tn *TendermintNetwork) GetMeter(chainID, valID, eventID string) (*eventmet return val.EventMeter().GetMetric(eventID) } +// assumes lock is held func (tn *TendermintNetwork) getChainVal(chainID, valID string) (*types.ValidatorState, error) { chain, ok := tn.Chains[chainID] if !ok { diff --git a/types/chain.go b/types/chain.go index d1813914..974ce4ad 100644 --- a/types/chain.go +++ b/types/chain.go @@ -1,9 +1,7 @@ package types import ( - "encoding/json" "fmt" - "io/ioutil" "sync" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/rcrowley/go-metrics" @@ -12,75 +10,50 @@ import ( //------------------------------------------------ // blockchain types +//------------------------------------------------ // Known chain and validator set IDs (from which anything else can be found) +// Returned by the Status RPC type ChainAndValidatorSetIDs struct { ChainIDs []string `json:"chain_ids"` ValidatorSetIDs []string `json:"validator_set_ids"` } -// Basic chain and network metrics -type BlockchainStatus struct { - // Blockchain Info - Height int `json:"height"` - BlockchainSize int64 `json:"blockchain_size"` // how might we get StateSize ? - MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"` - TxThroughput float64 `json:"tx_throughput" wire:"unsafe"` - - blockTimeMeter metrics.Meter - txThroughputMeter metrics.Meter - - // Network Info - NumValidators int `json:"num_validators"` - ActiveValidators int `json:"active_validators"` - ActiveNodes int `json:"active_nodes"` - MeanLatency float64 `json:"mean_latency" wire:"unsafe"` - Uptime float64 `json:"uptime" wire:"unsafe"` - - // TODO: charts for block time, latency (websockets/event-meter ?) -} - -func NewBlockchainStatus() *BlockchainStatus { - return &BlockchainStatus{ - blockTimeMeter: metrics.NewMeter(), - txThroughputMeter: metrics.NewMeter(), - } -} - -func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) { - s.Height = block.Header.Height - s.blockTimeMeter.Mark(1) - s.txThroughputMeter.Mark(int64(block.Header.NumTxs)) - s.MeanBlockTime = 1 / s.blockTimeMeter.RateMean() - s.TxThroughput = s.txThroughputMeter.RateMean() -} +//------------------------------------------------ +// chain state // Main chain state -// Returned over RPC but also used to manage state +// Returned over RPC; also used to manage state type ChainState struct { Config *BlockchainConfig `json:"config"` Status *BlockchainStatus `json:"status"` } -// chain config without ValidatorState -type BlockchainBaseConfig struct { - ID string `json:"id"` - ValSetID string `json:"val_set_id"` - Validators []*ValidatorConfig `json:"validators"` +func (cs *ChainState) NewBlock(block *tmtypes.Block) { + cs.Status.NewBlock(block) } -// basic chain config -// threadsafe -type BlockchainConfig struct { - ID string `json:"id"` - ValSetID string `json:"val_set_id"` +func (cs *ChainState) UpdateLatency(oldLatency, newLatency float64) { + cs.Status.UpdateLatency(oldLatency, newLatency) +} +//------------------------------------------------ +// Blockchain Config: id, validator config + +// Chain Config +type BlockchainConfig struct { + // should be fixed for life of chain + ID string `json:"id"` + ValSetID string `json:"val_set_id"` // NOTE: do we really commit to one val set per chain? + + // handles live validator states (latency, last block, etc) + // and validator set changes mtx sync.Mutex Validators []*ValidatorState `json:"validators"` // TODO: this should be ValidatorConfig and the state in BlockchainStatus valIDMap map[string]int // map IDs to indices } -// So we can fetch validator by id +// So we can fetch validator by id rather than index func (bc *BlockchainConfig) PopulateValIDMap() { bc.mtx.Lock() defer bc.mtx.Unlock() @@ -100,19 +73,61 @@ func (bc *BlockchainConfig) GetValidatorByID(valID string) (*ValidatorState, err return bc.Validators[valIndex], nil } -func LoadChainFromFile(configFile string) (*BlockchainConfig, error) { +//------------------------------------------------ +// BlockchainStatus - b, err := ioutil.ReadFile(configFile) - if err != nil { - return nil, err - } +// Basic blockchain metrics +type BlockchainStatus struct { + mtx sync.Mutex - // for now we start with one blockchain loaded from file; - // eventually more can be uploaded or created through endpoints - chainConfig := new(BlockchainConfig) - if err := json.Unmarshal(b, chainConfig); err != nil { - return nil, err - } + // Blockchain Info + Height int `json:"height"` + BlockchainSize int64 `json:"blockchain_size"` + MeanBlockTime float64 `json:"mean_block_time" wire:"unsafe"` + TxThroughput float64 `json:"tx_throughput" wire:"unsafe"` - return chainConfig, nil + blockTimeMeter metrics.Meter + txThroughputMeter metrics.Meter + + // Network Info + NumValidators int `json:"num_validators"` + ActiveValidators int `json:"active_validators"` + ActiveNodes int `json:"active_nodes"` + MeanLatency float64 `json:"mean_latency" wire:"unsafe"` + Uptime float64 `json:"uptime" wire:"unsafe"` + + // What else can we get / do we want? + // TODO: charts for block time, latency (websockets/event-meter ?) +} + +func NewBlockchainStatus() *BlockchainStatus { + return &BlockchainStatus{ + blockTimeMeter: metrics.NewMeter(), + txThroughputMeter: metrics.NewMeter(), + } +} + +func (s *BlockchainStatus) NewBlock(block *tmtypes.Block) { + s.mtx.Lock() + defer s.mtx.Unlock() + if block.Header.Height > s.Height { + s.Height = block.Header.Height + s.blockTimeMeter.Mark(1) + s.txThroughputMeter.Mark(int64(block.Header.NumTxs)) + s.MeanBlockTime = 1 / s.blockTimeMeter.RateMean() + s.TxThroughput = s.txThroughputMeter.RateMean() + } +} + +func (s *BlockchainStatus) UpdateLatency(oldLatency, newLatency float64) { + s.mtx.Lock() + defer s.mtx.Unlock() + + // update latency for this validator and avg latency for chain + mean := s.MeanLatency * float64(s.NumValidators) + mean = (mean - oldLatency + newLatency) / float64(s.NumValidators) + s.MeanLatency = mean + + // TODO: possibly update active nodes and uptime for chain + s.ActiveValidators = s.NumValidators // XXX } diff --git a/types/val.go b/types/val.go index f3c63598..b25d699a 100644 --- a/types/val.go +++ b/types/val.go @@ -3,7 +3,7 @@ package types import ( "encoding/json" "fmt" - "reflect" + "sync" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-crypto" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-event-meter" @@ -11,10 +11,15 @@ import ( client "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-rpc/client" "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/go-wire" ctypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/netmon/Godeps/_workspace/src/github.com/tendermint/tendermint/types" ) //------------------------------------------------ // validator types +//------------------------------------------------ + +//------------------------------------------------ +// simple validator set and validator (just crypto, no network) // validator set (independent of chains) type ValidatorSet struct { @@ -38,25 +43,19 @@ type Validator struct { Chains []string `json:"chains,omitempty"` // TODO: put this elsewhere (?) } -type ValidatorConfig struct { - Validator *Validator `json:"validator"` - P2PAddr string `json:"p2p_addr"` - RPCAddr string `json:"rpc_addr"` - Index int `json:"index,omitempty"` -} - -type ValidatorStatus struct { - Latency float64 `json:"latency" wire:"unsafe"` - BlockHeight int `json:"block_height"` -} +//------------------------------------------------ +// Live validator on a chain // Validator on a chain -// Responsible for communication with the validator // Returned over RPC but also used to manage state +// Responsible for communication with the validator type ValidatorState struct { Config *ValidatorConfig `json:"config"` Status *ValidatorStatus `json:"status"` + // Currently we get IPs and dial, + // but should reverse so the nodes dial the netmon, + // both for node privacy and easier reconfig (validators changing ip/port) em *eventmeter.EventMeter // holds a ws connection to the val client *client.ClientURI // rpc client } @@ -81,8 +80,23 @@ func (vs *ValidatorState) EventMeter() *eventmeter.EventMeter { return vs.em } +func (vs *ValidatorState) NewBlock(block *tmtypes.Block) { + vs.Status.mtx.Lock() + defer vs.Status.mtx.Unlock() + vs.Status.BlockHeight = block.Header.Height +} + +func (vs *ValidatorState) UpdateLatency(latency float64) float64 { + vs.Status.mtx.Lock() + defer vs.Status.mtx.Unlock() + old := vs.Status.Latency + vs.Status.Latency = latency + return old +} + // Return the validators pubkey. If it's not yet set, get it from the node // TODO: proof that it's the node's key +// XXX: Is this necessary? Why would it not be set func (vs *ValidatorState) PubKey() crypto.PubKey { if vs.Config.Validator.PubKey != nil { return vs.Config.Validator.PubKey @@ -99,6 +113,20 @@ func (vs *ValidatorState) PubKey() crypto.PubKey { return vs.Config.Validator.PubKey } +type ValidatorConfig struct { + mtx sync.Mutex + Validator *Validator `json:"validator"` + P2PAddr string `json:"p2p_addr"` + RPCAddr string `json:"rpc_addr"` + Index int `json:"index,omitempty"` +} + +type ValidatorStatus struct { + mtx sync.Mutex + Latency float64 `json:"latency" wire:"unsafe"` + BlockHeight int `json:"block_height"` +} + //--------------------------------------------------- // utilities @@ -112,7 +140,8 @@ func UnmarshalEvent(b json.RawMessage) (string, events.EventData, error) { } event, ok := (*result).(*ctypes.ResultEvent) if !ok { - return "", nil, fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result)) + return "", nil, nil // TODO: handle non-event messages (ie. return from subscribe/unsubscribe) + // fmt.Errorf("Result is not type *ctypes.ResultEvent. Got %v", reflect.TypeOf(*result)) } return event.Name, event.Data, nil