Merge branch 'dev/tm_bench_ignore_first_block_if_empty' into dev/tmbench_fix_end_time
This commit is contained in:
commit
93a3f701fe
|
@ -7,6 +7,7 @@ import (
|
||||||
"math"
|
"math"
|
||||||
"os"
|
"os"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"text/tabwriter"
|
"text/tabwriter"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
@ -249,20 +250,21 @@ func startTransacters(
|
||||||
) []*transacter {
|
) []*transacter {
|
||||||
transacters := make([]*transacter, len(endpoints))
|
transacters := make([]*transacter, len(endpoints))
|
||||||
|
|
||||||
|
wg := sync.WaitGroup{}
|
||||||
|
wg.Add(len(endpoints))
|
||||||
for i, e := range endpoints {
|
for i, e := range endpoints {
|
||||||
t := newTransacter(e, connections, txsRate, txSize, broadcastTxMethod)
|
t := newTransacter(e, connections, txsRate, txSize, broadcastTxMethod)
|
||||||
t.SetLogger(logger)
|
t.SetLogger(logger)
|
||||||
if err := t.Start(); err != nil {
|
go func(i int) {
|
||||||
fmt.Fprintln(os.Stderr, err)
|
defer wg.Done()
|
||||||
os.Exit(1)
|
if err := t.Start(); err != nil {
|
||||||
}
|
fmt.Fprintln(os.Stderr, err)
|
||||||
transacters[i] = t
|
os.Exit(1)
|
||||||
}
|
}
|
||||||
|
transacters[i] = t
|
||||||
// Wait until all transacters have started firing txs
|
}(i)
|
||||||
for _, t := range transacters {
|
|
||||||
t.WaitUntilAllConnectionsStartedFiringTxs()
|
|
||||||
}
|
}
|
||||||
|
wg.Wait()
|
||||||
|
|
||||||
return transacters
|
return transacters
|
||||||
}
|
}
|
||||||
|
|
|
@ -34,11 +34,11 @@ type transacter struct {
|
||||||
Connections int
|
Connections int
|
||||||
BroadcastTxMethod string
|
BroadcastTxMethod string
|
||||||
|
|
||||||
conns []*websocket.Conn
|
conns []*websocket.Conn
|
||||||
connsStarted []bool
|
connsBroken []bool
|
||||||
connsBroken []bool
|
startingWg sync.WaitGroup
|
||||||
wg sync.WaitGroup
|
endingWg sync.WaitGroup
|
||||||
stopped bool
|
stopped bool
|
||||||
|
|
||||||
logger log.Logger
|
logger log.Logger
|
||||||
}
|
}
|
||||||
|
@ -51,7 +51,6 @@ func newTransacter(target string, connections, rate int, size int, broadcastTxMe
|
||||||
Connections: connections,
|
Connections: connections,
|
||||||
BroadcastTxMethod: broadcastTxMethod,
|
BroadcastTxMethod: broadcastTxMethod,
|
||||||
conns: make([]*websocket.Conn, connections),
|
conns: make([]*websocket.Conn, connections),
|
||||||
connsStarted: make([]bool, connections),
|
|
||||||
connsBroken: make([]bool, connections),
|
connsBroken: make([]bool, connections),
|
||||||
logger: log.NewNopLogger(),
|
logger: log.NewNopLogger(),
|
||||||
}
|
}
|
||||||
|
@ -77,35 +76,22 @@ func (t *transacter) Start() error {
|
||||||
t.conns[i] = c
|
t.conns[i] = c
|
||||||
}
|
}
|
||||||
|
|
||||||
t.wg.Add(2 * t.Connections)
|
t.startingWg.Add(t.Connections)
|
||||||
|
t.endingWg.Add(2 * t.Connections)
|
||||||
for i := 0; i < t.Connections; i++ {
|
for i := 0; i < t.Connections; i++ {
|
||||||
go t.sendLoop(i)
|
go t.sendLoop(i)
|
||||||
go t.receiveLoop(i)
|
go t.receiveLoop(i)
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
t.startingWg.Wait()
|
||||||
}
|
|
||||||
|
|
||||||
// WaitUntilAllConnectionsStartedFiringTxs waits until all of this
|
return nil
|
||||||
// transacters connections have begun sending txs at the specified rate
|
|
||||||
func (t *transacter) WaitUntilAllConnectionsStartedFiringTxs() {
|
|
||||||
for {
|
|
||||||
started := true
|
|
||||||
for i := 0; i < t.Connections; i++ {
|
|
||||||
if !t.connsStarted[i] && !t.connsBroken[i] {
|
|
||||||
started = false
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if started {
|
|
||||||
break
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Stop closes the connections.
|
// Stop closes the connections.
|
||||||
func (t *transacter) Stop() {
|
func (t *transacter) Stop() {
|
||||||
t.stopped = true
|
t.stopped = true
|
||||||
t.wg.Wait()
|
t.endingWg.Wait()
|
||||||
for _, c := range t.conns {
|
for _, c := range t.conns {
|
||||||
c.Close()
|
c.Close()
|
||||||
}
|
}
|
||||||
|
@ -115,7 +101,7 @@ func (t *transacter) Stop() {
|
||||||
// `broadcast_tx_async`).
|
// `broadcast_tx_async`).
|
||||||
func (t *transacter) receiveLoop(connIndex int) {
|
func (t *transacter) receiveLoop(connIndex int) {
|
||||||
c := t.conns[connIndex]
|
c := t.conns[connIndex]
|
||||||
defer t.wg.Done()
|
defer t.endingWg.Done()
|
||||||
for {
|
for {
|
||||||
_, _, err := c.ReadMessage()
|
_, _, err := c.ReadMessage()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -136,6 +122,13 @@ func (t *transacter) receiveLoop(connIndex int) {
|
||||||
|
|
||||||
// sendLoop generates transactions at a given rate.
|
// sendLoop generates transactions at a given rate.
|
||||||
func (t *transacter) sendLoop(connIndex int) {
|
func (t *transacter) sendLoop(connIndex int) {
|
||||||
|
started := false
|
||||||
|
// Close the starting waitgroup, in the event that this fails to start
|
||||||
|
defer func() {
|
||||||
|
if !started {
|
||||||
|
t.startingWg.Done()
|
||||||
|
}
|
||||||
|
}()
|
||||||
c := t.conns[connIndex]
|
c := t.conns[connIndex]
|
||||||
|
|
||||||
c.SetPingHandler(func(message string) error {
|
c.SetPingHandler(func(message string) error {
|
||||||
|
@ -157,7 +150,7 @@ func (t *transacter) sendLoop(connIndex int) {
|
||||||
defer func() {
|
defer func() {
|
||||||
pingsTicker.Stop()
|
pingsTicker.Stop()
|
||||||
txsTicker.Stop()
|
txsTicker.Stop()
|
||||||
t.wg.Done()
|
t.endingWg.Done()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// hash of the host name is a part of each tx
|
// hash of the host name is a part of each tx
|
||||||
|
@ -174,7 +167,10 @@ func (t *transacter) sendLoop(connIndex int) {
|
||||||
startTime := time.Now()
|
startTime := time.Now()
|
||||||
endTime := startTime.Add(time.Second)
|
endTime := startTime.Add(time.Second)
|
||||||
numTxSent := t.Rate
|
numTxSent := t.Rate
|
||||||
t.connsStarted[connIndex] = true
|
if !started {
|
||||||
|
t.startingWg.Done()
|
||||||
|
started = true
|
||||||
|
}
|
||||||
|
|
||||||
for i := 0; i < t.Rate; i++ {
|
for i := 0; i < t.Rate; i++ {
|
||||||
// each transaction embeds connection index, tx number and hash of the hostname
|
// each transaction embeds connection index, tx number and hash of the hostname
|
||||||
|
|
Loading…
Reference in New Issue