tendermint/tm-bench/transacter.go

182 lines
4.2 KiB
Go
Raw Normal View History

package main
2017-03-13 08:10:51 -07:00
import (
"encoding/binary"
"encoding/hex"
"fmt"
2017-03-17 03:52:14 -07:00
"math/rand"
"net/http"
"net/url"
2017-03-22 04:53:30 -07:00
"os"
2017-03-13 08:10:51 -07:00
"sync"
"time"
"github.com/go-kit/kit/log"
"github.com/gorilla/websocket"
2017-03-13 08:10:51 -07:00
"github.com/pkg/errors"
rpctypes "github.com/tendermint/go-rpc/types"
)
const (
sendTimeout = 500 * time.Millisecond
// see https://github.com/tendermint/go-rpc/blob/develop/server/handlers.go#L313
pingPeriod = (30 * 9 / 10) * time.Second
)
2017-03-13 08:10:51 -07:00
type transacter struct {
2017-03-17 02:13:06 -07:00
Target string
Rate int
Connections int
2017-03-13 08:10:51 -07:00
conns []*websocket.Conn
2017-03-13 08:10:51 -07:00
wg sync.WaitGroup
2017-03-17 02:13:06 -07:00
stopped bool
logger log.Logger
2017-03-13 08:10:51 -07:00
}
2017-03-17 02:13:06 -07:00
func newTransacter(target string, connections int, rate int) *transacter {
return &transacter{
2017-03-17 02:13:06 -07:00
Target: target,
Rate: rate,
Connections: connections,
conns: make([]*websocket.Conn, connections),
logger: log.NewNopLogger(),
}
}
// SetLogger lets you set your own logger
func (t *transacter) SetLogger(l log.Logger) {
t.logger = l
}
// Start opens N = `t.Connections` connections to the target and creates read
// and write goroutines for each connection.
2017-03-13 08:10:51 -07:00
func (t *transacter) Start() error {
t.stopped = false
2017-03-17 02:13:06 -07:00
for i := 0; i < t.Connections; i++ {
c, _, err := connect(t.Target)
if err != nil {
2017-03-17 02:13:06 -07:00
return err
}
t.conns[i] = c
2017-03-17 02:13:06 -07:00
}
t.wg.Add(2 * t.Connections)
2017-03-17 02:13:06 -07:00
for i := 0; i < t.Connections; i++ {
go t.sendLoop(i)
go t.receiveLoop(i)
2017-03-13 08:10:51 -07:00
}
2017-03-17 02:13:06 -07:00
2017-03-13 08:10:51 -07:00
return nil
}
// Stop closes the connections.
2017-03-13 08:10:51 -07:00
func (t *transacter) Stop() {
t.stopped = true
t.wg.Wait()
2017-03-17 02:13:06 -07:00
for _, c := range t.conns {
c.Close()
}
}
// receiveLoop reads messages from the connection (empty in case of
// `broadcast_tx_async`).
func (t *transacter) receiveLoop(connIndex int) {
c := t.conns[connIndex]
defer t.wg.Done()
for {
_, _, err := c.ReadMessage()
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseNormalClosure) {
t.logger.Log("err", errors.Wrap(err, "failed to read response"))
}
return
}
if t.stopped {
return
}
2017-03-17 02:13:06 -07:00
}
2017-03-13 08:10:51 -07:00
}
// sendLoop generates transactions at a given rate.
2017-03-17 02:13:06 -07:00
func (t *transacter) sendLoop(connIndex int) {
c := t.conns[connIndex]
logger := log.With(t.logger, "addr", c.RemoteAddr())
var txNumber = 0
pingsTicker := time.NewTicker(pingPeriod)
txsTicker := time.NewTicker(1 * time.Second)
defer func() {
pingsTicker.Stop()
txsTicker.Stop()
t.wg.Done()
}()
2017-03-13 08:10:51 -07:00
for {
select {
case <-txsTicker.C:
startTime := time.Now()
for i := 0; i < t.Rate; i++ {
// each transaction embeds connection index and tx number
tx := generateTx(connIndex, txNumber)
c.SetWriteDeadline(time.Now().Add(sendTimeout))
err := c.WriteJSON(rpctypes.RPCRequest{
JSONRPC: "2.0",
ID: "",
Method: "broadcast_tx_async",
Params: []interface{}{hex.EncodeToString(tx)},
})
if err != nil {
fmt.Printf("%v. Try increasing the connections count and reducing the rate.\n", errors.Wrap(err, "txs send failed"))
os.Exit(1)
}
txNumber++
}
2017-03-17 02:13:06 -07:00
timeToSend := time.Now().Sub(startTime)
time.Sleep(time.Second - timeToSend)
logger.Log("event", fmt.Sprintf("sent %d transactions", t.Rate), "took", timeToSend)
case <-pingsTicker.C:
// Right now go-rpc server closes the connection in the absence of pings
c.SetWriteDeadline(time.Now().Add(sendTimeout))
if err := c.WriteMessage(websocket.PingMessage, []byte{}); err != nil {
logger.Log("err", errors.Wrap(err, "failed to write ping message"))
2017-03-17 02:13:06 -07:00
}
}
2017-03-17 02:13:06 -07:00
if t.stopped {
// To cleanly close a connection, a client should send a close
// frame and wait for the server to close the connection.
c.SetWriteDeadline(time.Now().Add(sendTimeout))
err := c.WriteMessage(websocket.CloseMessage, websocket.FormatCloseMessage(websocket.CloseNormalClosure, ""))
2017-03-13 08:10:51 -07:00
if err != nil {
logger.Log("err", errors.Wrap(err, "failed to write close message"))
2017-03-13 08:10:51 -07:00
}
return
}
2017-03-13 08:10:51 -07:00
}
}
func connect(host string) (*websocket.Conn, *http.Response, error) {
u := url.URL{Scheme: "ws", Host: host, Path: "/websocket"}
return websocket.DefaultDialer.Dial(u.String(), nil)
}
2017-03-17 02:13:06 -07:00
func generateTx(a int, b int) []byte {
2017-03-13 08:10:51 -07:00
tx := make([]byte, 250)
2017-03-17 02:13:06 -07:00
binary.PutUvarint(tx[:32], uint64(a))
binary.PutUvarint(tx[32:64], uint64(b))
2017-03-13 08:10:51 -07:00
if _, err := rand.Read(tx[234:]); err != nil {
panic(errors.Wrap(err, "failed to generate transaction"))
2017-03-13 08:10:51 -07:00
}
return tx
}