mirror of https://github.com/poanetwork/gecko.git
Limit the number of bytes in the send queue
This commit is contained in:
parent
86cd884ae2
commit
9e1d24c239
|
@ -124,6 +124,7 @@ type network struct {
|
|||
b Builder
|
||||
|
||||
stateLock sync.Mutex
|
||||
pendingBytes int
|
||||
closed bool
|
||||
disconnectedIPs map[string]struct{}
|
||||
connectedIPs map[string]struct{}
|
||||
|
|
|
@ -31,6 +31,10 @@ type peer struct {
|
|||
// state lock held.
|
||||
closed bool
|
||||
|
||||
// number of bytes currently in the send queue, is only modifed when the
|
||||
// network state lock held.
|
||||
pendingBytes int
|
||||
|
||||
// queue of messages this connection is attempting to send the peer. Is
|
||||
// closed when the connection is closed.
|
||||
sender chan []byte
|
||||
|
@ -155,6 +159,10 @@ func (p *peer) WriteMessages() {
|
|||
p.id,
|
||||
formatting.DumpBytes{Bytes: msg})
|
||||
|
||||
p.net.stateLock.Lock()
|
||||
p.pendingBytes -= len(msg)
|
||||
p.net.stateLock.Unlock()
|
||||
|
||||
packer := wrappers.Packer{Bytes: make([]byte, len(msg)+wrappers.IntLen)}
|
||||
packer.PackBytes(msg)
|
||||
msg = packer.Bytes
|
||||
|
@ -184,8 +192,21 @@ func (p *peer) send(msg Msg) bool {
|
|||
p.net.log.Debug("dropping message to %s due to a closed connection", p.id)
|
||||
return false
|
||||
}
|
||||
|
||||
msgBytes := msg.Bytes()
|
||||
newPendingBytes := p.net.pendingBytes + len(msgBytes)
|
||||
newConnPendingBytes := p.pendingBytes + len(msgBytes)
|
||||
if newPendingBytes > p.net.networkPendingSendBytesToRateLimit && // Check to see if we should be enforcing any rate limiting
|
||||
(newPendingBytes > p.net.maxNetworkPendingSendBytes || // Check to see if this message would put too much memory into the network
|
||||
newConnPendingBytes > p.net.maxNetworkPendingSendBytes/len(p.net.peers)) { // Check to see if this connection is using too much memory
|
||||
p.net.log.Debug("dropping message to %s due to a send queue with too many bytes", p.id)
|
||||
return false
|
||||
}
|
||||
|
||||
select {
|
||||
case p.sender <- msg.Bytes():
|
||||
case p.sender <- msgBytes:
|
||||
p.net.pendingBytes = newPendingBytes
|
||||
p.pendingBytes = newConnPendingBytes
|
||||
return true
|
||||
default:
|
||||
p.net.log.Debug("dropping message to %s due to a full send queue", p.id)
|
||||
|
|
Loading…
Reference in New Issue