Merge pull request #266 from tendermint/peer_filter

Peer filter
This commit is contained in:
Ethan Buchman 2016-09-09 20:00:53 -04:00 committed by GitHub
commit 398428d711
15 changed files with 361 additions and 102 deletions

View File

@ -44,7 +44,7 @@ type BlockchainReactor struct {
sw *p2p.Switch
state *sm.State
proxyAppConn proxy.AppConn // same as consensus.proxyAppConn
proxyAppConn proxy.AppConnConsensus // same as consensus.proxyAppConn
store *BlockStore
pool *BlockPool
fastSync bool
@ -55,7 +55,7 @@ type BlockchainReactor struct {
evsw *events.EventSwitch
}
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConn, store *BlockStore, fastSync bool) *BlockchainReactor {
func NewBlockchainReactor(state *sm.State, proxyAppConn proxy.AppConnConsensus, store *BlockStore, fastSync bool) *BlockchainReactor {
if state.LastBlockHeight == store.Height()-1 {
store.height -= 1 // XXX HACK, make this better
}

View File

@ -70,6 +70,7 @@ func GetConfig(rootDir string) cfg.Config {
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("filter_peers", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("disable_data_hash", false)

View File

@ -83,6 +83,7 @@ func ResetConfig(localPath string) cfg.Config {
mapConfig.SetDefault("revision_file", rootDir+"/revision")
mapConfig.SetDefault("cswal", rootDir+"/data/cswal")
mapConfig.SetDefault("cswal_light", false)
mapConfig.SetDefault("filter_peers", false)
mapConfig.SetDefault("block_size", 10000)
mapConfig.SetDefault("disable_data_hash", false)

View File

@ -215,7 +215,7 @@ type ConsensusState struct {
QuitService
config cfg.Config
proxyAppConn proxy.AppConn
proxyAppConn proxy.AppConnConsensus
blockStore *bc.BlockStore
mempool *mempl.Mempool
privValidator *types.PrivValidator
@ -238,7 +238,7 @@ type ConsensusState struct {
nSteps int // used for testing to limit the number of transitions the state makes
}
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConn, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
func NewConsensusState(config cfg.Config, state *sm.State, proxyAppConn proxy.AppConnConsensus, blockStore *bc.BlockStore, mempool *mempl.Mempool) *ConsensusState {
cs := &ConsensusState{
config: config,
proxyAppConn: proxyAppConn,
@ -1283,7 +1283,7 @@ func (cs *ConsensusState) commitStateUpdateMempool(s *sm.State, block *types.Blo
defer cs.mempool.Unlock()
// flush out any CheckTx that have already started
cs.proxyAppConn.FlushSync()
// cs.proxyAppConn.FlushSync() // ?! XXX
// Commit block, get hash back
res := cs.proxyAppConn.CommitSync()

2
glide.lock generated
View File

@ -70,7 +70,7 @@ imports:
- name: github.com/tendermint/go-merkle
version: 05042c6ab9cad51d12e4cecf717ae68e3b1409a8
- name: github.com/tendermint/go-p2p
version: 929cf433b9c8e987af5f7f3ca3ce717e1e3eda53
version: 642901d5aae311368d91ebfb888a4f6877de9614
subpackages:
- upnp
- name: github.com/tendermint/go-rpc

View File

@ -49,7 +49,7 @@ type Mempool struct {
config cfg.Config
proxyMtx sync.Mutex
proxyAppConn proxy.AppConn
proxyAppConn proxy.AppConnMempool
txs *clist.CList // concurrent linked-list of good txs
counter int64 // simple incrementing counter
height int // the last block Update()'d to
@ -63,7 +63,7 @@ type Mempool struct {
cacheList *list.List // to remove oldest tx when cache gets too big
}
func NewMempool(config cfg.Config, proxyAppConn proxy.AppConn) *Mempool {
func NewMempool(config cfg.Config, proxyAppConn proxy.AppConnMempool) *Mempool {
mempool := &Mempool{
config: config,
proxyAppConn: proxyAppConn,

View File

@ -6,7 +6,6 @@ import (
"net"
"net/http"
"strings"
"sync"
"time"
. "github.com/tendermint/go-common"
@ -26,9 +25,6 @@ import (
sm "github.com/tendermint/tendermint/state"
"github.com/tendermint/tendermint/types"
"github.com/tendermint/tendermint/version"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
"github.com/tendermint/tmsp/example/nil"
)
import _ "net/http/pprof"
@ -45,9 +41,10 @@ type Node struct {
privValidator *types.PrivValidator
genesisDoc *types.GenesisDoc
privKey crypto.PrivKeyEd25519
proxyApp proxy.AppConns
}
func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp func(proxyAddr, transport string, appHash []byte) proxy.AppConn) *Node {
func NewNode(config cfg.Config, privValidator *types.PrivValidator) *Node {
EnsureDir(config.GetString("db_dir"), 0700) // incase we use memdb, cswal still gets written here
@ -61,12 +58,9 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
// Get State
state := getState(config, stateDB)
// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
proxyAppConnMempool := getProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := getProxyApp(proxyAddr, transport, state.AppHash)
// Create the proxyApp, which houses three connections:
// query, consensus, and mempool
proxyApp := proxy.NewAppConns(config, state, blockStore)
// add the chainid and number of validators to the global config
config.Set("chain_id", state.ChainID)
@ -93,14 +87,14 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
}
// Make BlockchainReactor
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyAppConnConsensus, blockStore, fastSync)
bcReactor := bc.NewBlockchainReactor(state.Copy(), proxyApp.Consensus(), blockStore, fastSync)
// Make MempoolReactor
mempool := mempl.NewMempool(config, proxyAppConnMempool)
mempool := mempl.NewMempool(config, proxyApp.Mempool())
mempoolReactor := mempl.NewMempoolReactor(config, mempool)
// Make ConsensusReactor
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusReactor := consensus.NewConsensusReactor(consensusState, blockStore, fastSync)
if privValidator != nil {
consensusReactor.SetPrivValidator(privValidator)
@ -118,6 +112,26 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
sw.AddReactor("BLOCKCHAIN", bcReactor)
sw.AddReactor("CONSENSUS", consensusReactor)
// filter peers by addr or pubkey with a tmsp query.
// if the query return code is OK, add peer
// XXX: query format subject to change
if config.GetBool("filter_peers") {
sw.SetAddrFilter(func(addr net.Addr) error {
res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/addr/%s", addr.String())))
if res.IsOK() {
return nil
}
return res
})
sw.SetPubKeyFilter(func(pubkey crypto.PubKeyEd25519) error {
res := proxyApp.Query().QuerySync([]byte(Fmt("p2p/filter/pubkey/%X", pubkey.Bytes())))
if res.IsOK() {
return nil
}
return res
})
}
// add the event switch to all services
// they should all satisfy events.Eventable
SetEventSwitch(eventSwitch, bcReactor, mempoolReactor, consensusReactor)
@ -125,6 +139,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
// run the profile server
profileHost := config.GetString("prof_laddr")
if profileHost != "" {
go func() {
log.Warn("Profile server", "error", http.ListenAndServe(profileHost, nil))
}()
@ -142,6 +157,7 @@ func NewNode(config cfg.Config, privValidator *types.PrivValidator, getProxyApp
privValidator: privValidator,
genesisDoc: state.GenesisDoc,
privKey: privKey,
proxyApp: proxyApp,
}
}
@ -270,40 +286,6 @@ func makeNodeInfo(config cfg.Config, sw *p2p.Switch, privKey crypto.PrivKeyEd255
return nodeInfo
}
// Get a connection to the proxyAppConn addr.
// Check the current hash, and panic if it doesn't match.
func GetProxyApp(addr, transport string, hash []byte) (proxyAppConn proxy.AppConn) {
// use local app (for testing)
switch addr {
case "nilapp":
app := nilapp.NewNilApplication()
mtx := new(sync.Mutex)
proxyAppConn = tmspcli.NewLocalClient(mtx, app)
case "dummy":
app := dummy.NewDummyApplication()
mtx := new(sync.Mutex)
proxyAppConn = tmspcli.NewLocalClient(mtx, app)
default:
// Run forever in a loop
remoteApp, err := proxy.NewRemoteAppConn(addr, transport)
if err != nil {
Exit(Fmt("Failed to connect to proxy for mempool: %v", err))
}
proxyAppConn = remoteApp
}
// Check the hash
res := proxyAppConn.CommitSync()
if res.IsErr() {
PanicCrisis(Fmt("Error in getting proxyAppConn hash: %v", res))
}
if !bytes.Equal(hash, res.Data) {
log.Warn(Fmt("ProxyApp hash does not match. Expected %X, got %X", hash, res.Data))
}
return proxyAppConn
}
// Load the most recent state from "state" db,
// or create a new one (and save) from genesis.
func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
@ -319,7 +301,7 @@ func getState(config cfg.Config, stateDB dbm.DB) *sm.State {
// Users wishing to use an external signer for their validators
// should fork tendermint/tendermint and implement RunNode to
// load their custom priv validator and call NewNode(privVal, getProxyFunc)
// load their custom priv validator and call NewNode
func RunNode(config cfg.Config) {
// Wait until the genesis doc becomes available
genDocFile := config.GetString("genesis_file")
@ -347,7 +329,7 @@ func RunNode(config cfg.Config) {
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
// Create & start node
n := NewNode(config, privValidator, GetProxyApp)
n := NewNode(config, privValidator)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
@ -402,10 +384,7 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
// Create two proxyAppConn connections,
// one for the consensus and one for the mempool.
proxyAddr := config.GetString("proxy_app")
transport := config.GetString("tmsp")
proxyAppConnMempool := GetProxyApp(proxyAddr, transport, state.AppHash)
proxyAppConnConsensus := GetProxyApp(proxyAddr, transport, state.AppHash)
proxyApp := proxy.NewAppConns(config, state, blockStore)
// add the chainid to the global config
config.Set("chain_id", state.ChainID)
@ -417,9 +396,9 @@ func newConsensusState(config cfg.Config) *consensus.ConsensusState {
Exit(Fmt("Failed to start event switch: %v", err))
}
mempool := mempl.NewMempool(config, proxyAppConnMempool)
mempool := mempl.NewMempool(config, proxyApp.Mempool())
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyAppConnConsensus, blockStore, mempool)
consensusState := consensus.NewConsensusState(config, state.Copy(), proxyApp.Consensus(), blockStore, mempool)
consensusState.SetEventSwitch(eventSwitch)
return consensusState
}

View File

@ -17,7 +17,7 @@ func TestNodeStartStop(t *testing.T) {
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
// Create & start node
n := NewNode(config, privValidator, GetProxyApp)
n := NewNode(config, privValidator)
protocol, address := ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, config.GetBool("skip_upnp"))
n.AddListener(l)

View File

@ -2,8 +2,139 @@ package proxy
import (
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/types"
)
type AppConn interface {
tmspcli.Client
//----------------------------------------------------------------------------------------
// Enforce which tmsp msgs can be sent on a connection at the type level
type AppConnConsensus interface {
SetResponseCallback(tmspcli.Callback)
Error() error
InitChainSync(validators []*types.Validator) (err error)
BeginBlockSync(height uint64) (err error)
AppendTxAsync(tx []byte) *tmspcli.ReqRes
EndBlockSync(height uint64) (changedValidators []*types.Validator, err error)
CommitSync() (res types.Result)
}
type AppConnMempool interface {
SetResponseCallback(tmspcli.Callback)
Error() error
CheckTxAsync(tx []byte) *tmspcli.ReqRes
FlushAsync() *tmspcli.ReqRes
FlushSync() error
}
type AppConnQuery interface {
Error() error
EchoSync(string) (res types.Result)
InfoSync() (res types.Result)
QuerySync(tx []byte) (res types.Result)
// SetOptionSync(key string, value string) (res types.Result)
}
//-----------------------------------------------------------------------------------------
// Implements AppConnConsensus (subset of tmspcli.Client)
type appConnConsensus struct {
appConn tmspcli.Client
}
func NewAppConnConsensus(appConn tmspcli.Client) *appConnConsensus {
return &appConnConsensus{
appConn: appConn,
}
}
func (app *appConnConsensus) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnConsensus) Error() error {
return app.appConn.Error()
}
func (app *appConnConsensus) InitChainSync(validators []*types.Validator) (err error) {
return app.appConn.InitChainSync(validators)
}
func (app *appConnConsensus) BeginBlockSync(height uint64) (err error) {
return app.appConn.BeginBlockSync(height)
}
func (app *appConnConsensus) AppendTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.AppendTxAsync(tx)
}
func (app *appConnConsensus) EndBlockSync(height uint64) (changedValidators []*types.Validator, err error) {
return app.appConn.EndBlockSync(height)
}
func (app *appConnConsensus) CommitSync() (res types.Result) {
return app.appConn.CommitSync()
}
//------------------------------------------------
// Implements AppConnMempool (subset of tmspcli.Client)
type appConnMempool struct {
appConn tmspcli.Client
}
func NewAppConnMempool(appConn tmspcli.Client) *appConnMempool {
return &appConnMempool{
appConn: appConn,
}
}
func (app *appConnMempool) SetResponseCallback(cb tmspcli.Callback) {
app.appConn.SetResponseCallback(cb)
}
func (app *appConnMempool) Error() error {
return app.appConn.Error()
}
func (app *appConnMempool) FlushAsync() *tmspcli.ReqRes {
return app.appConn.FlushAsync()
}
func (app *appConnMempool) FlushSync() error {
return app.appConn.FlushSync()
}
func (app *appConnMempool) CheckTxAsync(tx []byte) *tmspcli.ReqRes {
return app.appConn.CheckTxAsync(tx)
}
//------------------------------------------------
// Implements AppConnQuery (subset of tmspcli.Client)
type appConnQuery struct {
appConn tmspcli.Client
}
func NewAppConnQuery(appConn tmspcli.Client) *appConnQuery {
return &appConnQuery{
appConn: appConn,
}
}
func (app *appConnQuery) Error() error {
return app.appConn.Error()
}
func (app *appConnQuery) EchoSync(msg string) (res types.Result) {
return app.appConn.EchoSync(msg)
}
func (app *appConnQuery) InfoSync() (res types.Result) {
return app.appConn.InfoSync()
}
func (app *appConnQuery) QuerySync(tx []byte) (res types.Result) {
return app.appConn.QuerySync(tx)
}

View File

@ -5,10 +5,42 @@ import (
"testing"
. "github.com/tendermint/go-common"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
"github.com/tendermint/tmsp/server"
"github.com/tendermint/tmsp/types"
)
//----------------------------------------
type AppConnTest interface {
EchoAsync(string) *tmspcli.ReqRes
FlushSync() error
InfoSync() (res types.Result)
}
type appConnTest struct {
appConn tmspcli.Client
}
func NewAppConnTest(appConn tmspcli.Client) AppConnTest {
return &appConnTest{appConn}
}
func (app *appConnTest) EchoAsync(msg string) *tmspcli.ReqRes {
return app.appConn.EchoAsync(msg)
}
func (app *appConnTest) FlushSync() error {
return app.appConn.FlushSync()
}
func (app *appConnTest) InfoSync() types.Result {
return app.appConn.InfoSync()
}
//----------------------------------------
var SOCKET = "socket"
func TestEcho(t *testing.T) {
@ -21,12 +53,12 @@ func TestEcho(t *testing.T) {
}
defer s.Stop()
// Start client
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
cli, err := NewTMSPClient(sockPath, SOCKET)
if err != nil {
Exit(err.Error())
} else {
t.Log("Connected")
}
proxy := NewAppConnTest(cli)
t.Log("Connected")
for i := 0; i < 1000; i++ {
proxy.EchoAsync(Fmt("echo-%v", i))
@ -44,12 +76,12 @@ func BenchmarkEcho(b *testing.B) {
}
defer s.Stop()
// Start client
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
cli, err := NewTMSPClient(sockPath, SOCKET)
if err != nil {
Exit(err.Error())
} else {
b.Log("Connected")
}
proxy := NewAppConnTest(cli)
b.Log("Connected")
echoString := strings.Repeat(" ", 200)
b.StartTimer() // Start benchmarking tests
@ -72,12 +104,13 @@ func TestInfo(t *testing.T) {
}
defer s.Stop()
// Start client
proxy, err := NewRemoteAppConn(sockPath, SOCKET)
cli, err := NewTMSPClient(sockPath, SOCKET)
if err != nil {
Exit(err.Error())
} else {
t.Log("Connected")
}
proxy := NewAppConnTest(cli)
t.Log("Connected")
res := proxy.InfoSync()
if res.IsErr() {
t.Errorf("Unexpected error: %v", err)

128
proxy/multi_app_conn.go Normal file
View File

@ -0,0 +1,128 @@
package proxy
import (
"fmt"
"sync"
. "github.com/tendermint/go-common"
cfg "github.com/tendermint/go-config"
tmspcli "github.com/tendermint/tmsp/client"
"github.com/tendermint/tmsp/example/dummy"
nilapp "github.com/tendermint/tmsp/example/nil"
)
// Get a connected tmsp client
func NewTMSPClient(addr, transport string) (tmspcli.Client, error) {
var client tmspcli.Client
// use local app (for testing)
// TODO: local proxy app conn
switch addr {
case "nilapp":
app := nilapp.NewNilApplication()
mtx := new(sync.Mutex) // TODO
client = tmspcli.NewLocalClient(mtx, app)
case "dummy":
app := dummy.NewDummyApplication()
mtx := new(sync.Mutex) // TODO
client = tmspcli.NewLocalClient(mtx, app)
default:
// Run forever in a loop
mustConnect := false
remoteApp, err := tmspcli.NewClient(addr, transport, mustConnect)
if err != nil {
return nil, fmt.Errorf("Failed to connect to proxy for mempool: %v", err)
}
client = remoteApp
}
return client, nil
}
//---------
type AppConns interface {
Mempool() AppConnMempool
Consensus() AppConnConsensus
Query() AppConnQuery
}
func NewAppConns(config cfg.Config, state State, blockStore BlockStore) AppConns {
return NewMultiAppConn(config, state, blockStore)
}
// a multiAppConn is made of a few appConns (mempool, consensus)
// and manages their underlying tmsp clients, ensuring they reboot together
type multiAppConn struct {
QuitService
config cfg.Config
state State
blockStore BlockStore
mempoolConn *appConnMempool
consensusConn *appConnConsensus
queryConn *appConnQuery
}
// Make all necessary tmsp connections to the application
func NewMultiAppConn(config cfg.Config, state State, blockStore BlockStore) *multiAppConn {
multiAppConn := &multiAppConn{
config: config,
state: state,
blockStore: blockStore,
}
multiAppConn.QuitService = *NewQuitService(log, "multiAppConn", multiAppConn)
multiAppConn.Start()
return multiAppConn
}
// Returns the mempool connection
func (app *multiAppConn) Mempool() AppConnMempool {
return app.mempoolConn
}
// Returns the consensus Connection
func (app *multiAppConn) Consensus() AppConnConsensus {
return app.consensusConn
}
func (app *multiAppConn) Query() AppConnQuery {
return app.queryConn
}
func (app *multiAppConn) OnStart() error {
app.QuitService.OnStart()
addr := app.config.GetString("proxy_app")
transport := app.config.GetString("tmsp")
// query connection
querycli, err := NewTMSPClient(addr, transport)
if err != nil {
return err
}
app.queryConn = NewAppConnQuery(querycli)
// mempool connection
memcli, err := NewTMSPClient(addr, transport)
if err != nil {
return err
}
app.mempoolConn = NewAppConnMempool(memcli)
// consensus connection
concli, err := NewTMSPClient(addr, transport)
if err != nil {
return err
}
app.consensusConn = NewAppConnConsensus(concli)
// TODO: handshake
// TODO: replay blocks
// TODO: (on restart) replay mempool
return nil
}

View File

@ -1,23 +0,0 @@
package proxy
import (
tmspcli "github.com/tendermint/tmsp/client"
)
// This is goroutine-safe, but users should beware that
// the application in general is not meant to be interfaced
// with concurrent callers.
type remoteAppConn struct {
tmspcli.Client
}
func NewRemoteAppConn(addr, transport string) (*remoteAppConn, error) {
client, err := tmspcli.NewClient(addr, transport, false)
if err != nil {
return nil, err
}
appConn := &remoteAppConn{
Client: client,
}
return appConn, nil
}

9
proxy/state.go Normal file
View File

@ -0,0 +1,9 @@
package proxy
type State interface {
// TODO
}
type BlockStore interface {
// TODO
}

View File

@ -54,7 +54,7 @@ func newNode(ready chan struct{}) {
// Create & start node
privValidatorFile := config.GetString("priv_validator_file")
privValidator := types.LoadOrGenPrivValidator(privValidatorFile)
node = nm.NewNode(config, privValidator, nm.GetProxyApp)
node = nm.NewNode(config, privValidator)
protocol, address := nm.ProtocolAndAddress(config.GetString("node_laddr"))
l := p2p.NewDefaultListener(protocol, address, true)
node.AddListener(l)

View File

@ -18,7 +18,7 @@ func (s *State) ValidateBlock(block *types.Block) error {
// Execute the block to mutate State.
// Validates block and then executes Data.Txs in the block.
func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block, blockPartsHeader types.PartSetHeader) error {
func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block, blockPartsHeader types.PartSetHeader) error {
// Validate the block.
err := s.validateBlock(block)
@ -55,7 +55,7 @@ func (s *State) ExecBlock(eventCache events.Fireable, proxyAppConn proxy.AppConn
// Executes block's transactions on proxyAppConn.
// TODO: Generate a bitmap or otherwise store tx validity in state.
func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConn, block *types.Block) error {
func (s *State) execBlockOnProxyApp(eventCache events.Fireable, proxyAppConn proxy.AppConnConsensus, block *types.Block) error {
var validTxs, invalidTxs = 0, 0