tendermint/crawler/crawl.go

337 lines
7.8 KiB
Go
Raw Normal View History

2015-04-20 02:13:04 -07:00
package crawler
import (
"fmt"
2015-04-20 14:00:19 -07:00
"github.com/tendermint/tendermint/binary"
2015-04-20 02:13:04 -07:00
rpctypes "github.com/tendermint/tendermint/rpc/core/types"
rpcclient "github.com/tendermint/tendermint/rpc/core_client"
2015-04-20 14:00:19 -07:00
"github.com/tendermint/tendermint/types"
2015-04-20 02:13:04 -07:00
"time"
2015-04-20 14:00:19 -07:00
"io/ioutil"
2015-04-20 02:13:04 -07:00
)
const (
2015-04-20 14:00:19 -07:00
CheckQueueBufferSize = 100
NodeQueueBufferSize = 100
GetPeersTickerSeconds = 5
2015-04-20 02:13:04 -07:00
)
//---------------------------------------------------------------------------------------
// crawler.Node
// A node is a peer on the network
2015-04-20 02:13:04 -07:00
type Node struct {
Host string
P2PPort uint16
RPCPort uint16
failed int
connected bool
client *NodeClient
2015-04-20 14:00:19 -07:00
LastSeen time.Time
ChainID string
2015-04-20 14:00:19 -07:00
BlockHeight uint
BlockHistory map[uint]time.Time // when we saw each block
NetInfo *rpctypes.ResponseNetInfo
2015-04-20 02:13:04 -07:00
Validator bool
// other peers we heard about this peer from
heardFrom map[string]struct{}
}
func (n *Node) Address() string {
return fmt.Sprintf("%s:%d", n.Host, n.RPCPort)
}
// Set the basic status and chain_id info for a node from RPC responses
2015-04-20 02:13:04 -07:00
func (n *Node) SetInfo(status *rpctypes.ResponseStatus, netinfo *rpctypes.ResponseNetInfo) {
n.LastSeen = time.Now()
n.ChainID = status.ChainID
2015-04-20 02:13:04 -07:00
n.BlockHeight = status.LatestBlockHeight
n.NetInfo = netinfo
// n.Validator
}
// A node client is used to talk to a node over rpc and websockets
type NodeClient struct {
rpc rpcclient.Client
ws *rpcclient.WSClient
}
// Create a new client for the node at the given addr
func NewNodeClient(addr string) *NodeClient {
return &NodeClient{
2015-04-20 14:00:19 -07:00
rpc: rpcclient.NewClient("http://"+addr, "JSONRPC"),
ws: rpcclient.NewWSClient("ws://" + addr + "/events"),
2015-04-20 02:13:04 -07:00
}
}
// A simple wrapper for mediating access to the maps
type nodeInfo struct {
2015-04-20 14:00:19 -07:00
host string // the new nodes address
port uint16 // node's listening port
from string // the peer that told us about this node
connected bool // move node from nodePool to nodes
disconnected bool // move node from nodes to nodePool
2015-04-20 02:13:04 -07:00
}
2015-04-20 14:00:19 -07:00
func (ni nodeInfo) unpack() (string, uint16, string, bool, bool) {
return ni.host, ni.port, ni.from, ni.connected, ni.disconnected
2015-04-20 02:13:04 -07:00
}
// crawler.Node
//---------------------------------------------------------------------------------------
// crawler.Crawler
2015-04-20 14:00:19 -07:00
// A crawler has a local node, a set of potential nodes in the nodePool, and connected nodes.
// Maps are only accessed by one go-routine, mediated by the checkQueue
2015-04-20 02:13:04 -07:00
type Crawler struct {
self *Node
client *NodeClient
checkQueue chan nodeInfo
nodePool map[string]*Node
nodes map[string]*Node
nodeQueue chan *Node
quit chan struct{}
}
// Create a new Crawler using the local RPC server at addr
func NewCrawler(host string, port uint16) *Crawler {
return &Crawler{
2015-04-20 14:00:19 -07:00
self: &Node{Host: host, RPCPort: port, client: NewNodeClient(fmt.Sprintf("%s:%d", host, port))},
2015-04-20 02:13:04 -07:00
checkQueue: make(chan nodeInfo, CheckQueueBufferSize),
nodePool: make(map[string]*Node),
nodes: make(map[string]*Node),
nodeQueue: make(chan *Node, NodeQueueBufferSize),
quit: make(chan struct{}),
}
}
func (c *Crawler) Start() error {
2015-04-20 14:00:19 -07:00
// connect to local node first, set info,
// and fire peers onto the checkQueue
if err := c.pollNode(c.self); err != nil {
2015-04-20 02:13:04 -07:00
return err
}
2015-04-20 14:00:19 -07:00
// connect to weboscket, subscribe to local events
// and run the read loop to listen for new blocks
if r, err := c.self.client.ws.Dial(); err != nil {
fmt.Println(r)
b, _ := ioutil.ReadAll(r.Body)
fmt.Println(string(b))
2015-04-20 02:13:04 -07:00
return err
}
2015-04-20 14:00:19 -07:00
if err := c.self.client.ws.Subscribe(types.EventStringNewBlock()); err != nil {
2015-04-20 02:13:04 -07:00
return err
}
2015-04-20 14:00:19 -07:00
go c.readLoop(c.self)
2015-04-20 02:13:04 -07:00
// add ourselves to the nodes list
c.nodes[c.self.Address()] = c.self
2015-04-20 14:00:19 -07:00
// nodes we hear about get put on the checkQueue
// by pollNode and are handled in the checkLoop.
2015-04-20 02:13:04 -07:00
// if its a node we're not already connected to,
2015-04-20 14:00:19 -07:00
// it gets put on the nodeQueue and
2015-04-20 02:13:04 -07:00
// we attempt to connect in the connectLoop
go c.checkLoop()
go c.connectLoop()
return nil
}
func (c *Crawler) Stop() {
close(c.quit)
}
2015-04-20 14:00:19 -07:00
// listen for events from the node and ping it for peers on a ticker
func (c *Crawler) readLoop(node *Node) {
wsChan := node.client.ws.Read()
getPeersTicker := time.Tick(time.Second * GetPeersTickerSeconds)
for {
select {
case wsMsg := <-wsChan:
// update the node with his new info
if err := c.consumeMessage(wsMsg, node); err != nil {
// lost the node, put him back on the checkQueu
c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
disconnected: true,
})
}
case <-getPeersTicker:
if err := c.pollNode(node); err != nil {
// lost the node, put him back on the checkQueu
c.checkNode(nodeInfo{
host: node.Host,
port: node.RPCPort,
disconnected: true,
})
}
case <-c.quit:
return
}
}
}
func (c *Crawler) consumeMessage(wsMsg *rpcclient.WSMsg, node *Node) error {
if wsMsg.Error != nil {
return wsMsg.Error
}
// unmarshal block event
var response struct {
Event string
Data *types.Block
Error string
}
var err error
binary.ReadJSON(&response, wsMsg.Data, &err)
if err != nil {
return err
}
if response.Error != "" {
return fmt.Errorf(response.Error)
}
block := response.Data
node.LastSeen = time.Now()
node.BlockHeight = block.Height
node.BlockHistory[block.Height] = node.LastSeen
return nil
2015-04-20 02:13:04 -07:00
}
// check nodes against the nodePool map one at a time
2015-04-20 14:00:19 -07:00
// acts as a mutex on nodePool and nodes
2015-04-20 02:13:04 -07:00
func (c *Crawler) checkLoop() {
for {
select {
case ni := <-c.checkQueue:
2015-04-20 14:00:19 -07:00
host, port, from, connected, disconnected := ni.unpack()
2015-04-20 02:13:04 -07:00
addr := fmt.Sprintf("%s:%d", host, port)
2015-04-20 14:00:19 -07:00
// check if we need to swap node between maps (eg. its connected or disconnected)
// NOTE: once we hear about a node, we never forget ...
if connected {
2015-04-20 02:13:04 -07:00
n, _ := c.nodePool[addr]
2015-04-20 14:00:19 -07:00
c.nodes[addr] = n
2015-04-20 02:13:04 -07:00
delete(c.nodePool, addr)
continue
2015-04-20 14:00:19 -07:00
} else if disconnected {
n, _ := c.nodes[addr]
c.nodePool[addr] = n
delete(c.nodes, addr)
continue
2015-04-20 02:13:04 -07:00
}
// TODO: if address is badly formed
// we should punish ni.from
_ = from
n, ok := c.nodePool[addr]
// create the node if unknown
if !ok {
n = &Node{Host: host, RPCPort: port}
2015-04-20 14:00:19 -07:00
c.nodePool[addr] = n
2015-04-20 02:13:04 -07:00
} else if n.connected {
// should be removed soon
continue
}
// queue it for connecting to
c.nodeQueue <- n
case <-c.quit:
return
}
}
}
// read off the nodeQueue and attempt to connect to nodes
func (c *Crawler) connectLoop() {
for {
select {
case node := <-c.nodeQueue:
go c.connectToNode(node)
case <-c.quit:
// close all connections
for addr, node := range c.nodes {
_, _ = addr, node
// TODO: close conn
}
return
}
}
}
func (c *Crawler) connectToNode(node *Node) {
addr := node.Address()
node.client = NewNodeClient(addr)
2015-04-20 14:00:19 -07:00
if b, err := node.client.ws.Dial(); err != nil {
fmt.Println("err on ws dial:", b, err)
2015-04-20 02:13:04 -07:00
// set failed, return
}
// remove from nodePool, add to nodes
c.checkNode(nodeInfo{
2015-04-20 14:00:19 -07:00
host: node.Host,
port: node.RPCPort,
connected: true,
2015-04-20 02:13:04 -07:00
})
2015-04-20 14:00:19 -07:00
if err := c.pollNode(node); err != nil {
// TODO: we had a good ws con
// but failed on rpc?!
// try again or something ...
// if we still fail, report and disconnect
}
2015-04-20 02:13:04 -07:00
2015-04-20 14:00:19 -07:00
fmt.Println("Successfully connected to node", node.Address())
// blocks (until quit or err)
c.readLoop(node)
}
func (c *Crawler) checkNode(ni nodeInfo) {
c.checkQueue <- ni
2015-04-20 02:13:04 -07:00
}
func (c *Crawler) pollNode(node *Node) error {
2015-04-20 14:00:19 -07:00
// get the status info
2015-04-20 02:13:04 -07:00
status, err := node.client.rpc.Status()
if err != nil {
return err
}
2015-04-20 14:00:19 -07:00
// get peers and net info
2015-04-20 02:13:04 -07:00
netinfo, err := node.client.rpc.NetInfo()
if err != nil {
return err
}
2015-04-20 14:00:19 -07:00
// set the info for the node
2015-04-20 02:13:04 -07:00
node.SetInfo(status, netinfo)
// fire each peer on the checkQueue
for _, p := range netinfo.Peers {
c.checkNode(nodeInfo{
host: p.Host,
port: p.RPCPort,
from: node.Address(),
})
}
return nil
}