Use waitgroups for starting up

This commit is contained in:
ValarDragon 2018-07-12 15:37:46 -07:00
parent bd050c1d03
commit e46ae15859
2 changed files with 35 additions and 37 deletions

View File

@ -7,6 +7,7 @@ import (
"math" "math"
"os" "os"
"strings" "strings"
"sync"
"text/tabwriter" "text/tabwriter"
"time" "time"
@ -247,20 +248,21 @@ func startTransacters(
) []*transacter { ) []*transacter {
transacters := make([]*transacter, len(endpoints)) transacters := make([]*transacter, len(endpoints))
wg := sync.WaitGroup{}
wg.Add(len(endpoints))
for i, e := range endpoints { for i, e := range endpoints {
t := newTransacter(e, connections, txsRate, txSize, broadcastTxMethod) t := newTransacter(e, connections, txsRate, txSize, broadcastTxMethod)
t.SetLogger(logger) t.SetLogger(logger)
if err := t.Start(); err != nil { go func(i int) {
fmt.Fprintln(os.Stderr, err) defer wg.Done()
os.Exit(1) if err := t.Start(); err != nil {
} fmt.Fprintln(os.Stderr, err)
transacters[i] = t os.Exit(1)
} }
transacters[i] = t
// Wait until all transacters have started firing txs }(i)
for _, t := range transacters {
t.WaitUntilAllConnectionsStartedFiringTxs()
} }
wg.Wait()
return transacters return transacters
} }

View File

@ -34,11 +34,11 @@ type transacter struct {
Connections int Connections int
BroadcastTxMethod string BroadcastTxMethod string
conns []*websocket.Conn conns []*websocket.Conn
connsStarted []bool connsBroken []bool
connsBroken []bool startingWg sync.WaitGroup
wg sync.WaitGroup endingWg sync.WaitGroup
stopped bool stopped bool
logger log.Logger logger log.Logger
} }
@ -51,7 +51,6 @@ func newTransacter(target string, connections, rate int, size int, broadcastTxMe
Connections: connections, Connections: connections,
BroadcastTxMethod: broadcastTxMethod, BroadcastTxMethod: broadcastTxMethod,
conns: make([]*websocket.Conn, connections), conns: make([]*websocket.Conn, connections),
connsStarted: make([]bool, connections),
connsBroken: make([]bool, connections), connsBroken: make([]bool, connections),
logger: log.NewNopLogger(), logger: log.NewNopLogger(),
} }
@ -77,35 +76,22 @@ func (t *transacter) Start() error {
t.conns[i] = c t.conns[i] = c
} }
t.wg.Add(2 * t.Connections) t.startingWg.Add(t.Connections)
t.endingWg.Add(2 * t.Connections)
for i := 0; i < t.Connections; i++ { for i := 0; i < t.Connections; i++ {
go t.sendLoop(i) go t.sendLoop(i)
go t.receiveLoop(i) go t.receiveLoop(i)
} }
return nil t.startingWg.Wait()
}
// WaitUntilAllConnectionsStartedFiringTxs waits until all of this return nil
// transacters connections have begun sending txs at the specified rate
func (t *transacter) WaitUntilAllConnectionsStartedFiringTxs() {
for {
started := true
for i := 0; i < t.Connections; i++ {
if !t.connsStarted[i] && !t.connsBroken[i] {
started = false
}
}
if started {
break
}
}
} }
// Stop closes the connections. // Stop closes the connections.
func (t *transacter) Stop() { func (t *transacter) Stop() {
t.stopped = true t.stopped = true
t.wg.Wait() t.endingWg.Wait()
for _, c := range t.conns { for _, c := range t.conns {
c.Close() c.Close()
} }
@ -115,7 +101,7 @@ func (t *transacter) Stop() {
// `broadcast_tx_async`). // `broadcast_tx_async`).
func (t *transacter) receiveLoop(connIndex int) { func (t *transacter) receiveLoop(connIndex int) {
c := t.conns[connIndex] c := t.conns[connIndex]
defer t.wg.Done() defer t.endingWg.Done()
for { for {
_, _, err := c.ReadMessage() _, _, err := c.ReadMessage()
if err != nil { if err != nil {
@ -136,6 +122,13 @@ func (t *transacter) receiveLoop(connIndex int) {
// sendLoop generates transactions at a given rate. // sendLoop generates transactions at a given rate.
func (t *transacter) sendLoop(connIndex int) { func (t *transacter) sendLoop(connIndex int) {
started := false
// Close the starting waitgroup, in the event that this fails to start
defer func() {
if !started {
t.startingWg.Done()
}
}()
c := t.conns[connIndex] c := t.conns[connIndex]
c.SetPingHandler(func(message string) error { c.SetPingHandler(func(message string) error {
@ -157,7 +150,7 @@ func (t *transacter) sendLoop(connIndex int) {
defer func() { defer func() {
pingsTicker.Stop() pingsTicker.Stop()
txsTicker.Stop() txsTicker.Stop()
t.wg.Done() t.endingWg.Done()
}() }()
// hash of the host name is a part of each tx // hash of the host name is a part of each tx
@ -174,7 +167,10 @@ func (t *transacter) sendLoop(connIndex int) {
startTime := time.Now() startTime := time.Now()
endTime := startTime.Add(time.Second) endTime := startTime.Add(time.Second)
numTxSent := t.Rate numTxSent := t.Rate
t.connsStarted[connIndex] = true if !started {
t.startingWg.Done()
started = true
}
for i := 0; i < t.Rate; i++ { for i := 0; i < t.Rate; i++ {
// each transaction embeds connection index, tx number and hash of the hostname // each transaction embeds connection index, tx number and hash of the hostname