From 41c282b38bef8553c7d12c9eef321887042159e9 Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Tue, 27 Aug 2019 20:07:37 +0800 Subject: [PATCH] fix buffer size bug --- lib/common/netpackager.go | 28 +++++++++++++++++++++------- lib/common/util.go | 20 ++++++++++---------- lib/mux/mux.go | 2 +- lib/pool/pool.go | 5 +++-- 4 files changed, 35 insertions(+), 20 deletions(-) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index abaff31..2d589aa 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -27,10 +27,13 @@ func (Self *BasePackager) NewPac(contents ...interface{}) (err error) { case nil: Self.Content = Self.Content[:0] case []byte: - Self.Content = append(Self.Content, content.([]byte)...) + err = Self.appendByte(content.([]byte)) case string: - Self.Content = append(Self.Content, []byte(content.(string))...) - Self.Content = append(Self.Content, []byte(CONN_DATA_SEQ)...) + err = Self.appendByte([]byte(content.(string))) + if err != nil { + return + } + err = Self.appendByte([]byte(CONN_DATA_SEQ)) default: err = Self.marshal(content) } @@ -39,6 +42,18 @@ func (Self *BasePackager) NewPac(contents ...interface{}) (err error) { return } +func (Self *BasePackager) appendByte(data []byte) (err error) { + m := len(Self.Content) + n := m + len(data) + if n <= cap(Self.Content) { + Self.Content = Self.Content[0:n] // grow the length for copy + copy(Self.Content[m:n], data) + return nil + } else { + return bytes.ErrTooLarge + } +} + //似乎这里涉及到父类作用域问题,当子类调用父类的方法时,其struct仅仅为父类的 func (Self *BasePackager) Pack(writer io.Writer) (err error) { err = binary.Write(writer, binary.LittleEndian, Self.Length) @@ -53,12 +68,12 @@ func (Self *BasePackager) Pack(writer io.Writer) (err error) { //Unpack 会导致传入的数字类型转化成float64!! //主要原因是json unmarshal并未传入正确的数据类型 func (Self *BasePackager) UnPack(reader io.Reader) (err error) { + Self.Content = pool.CopyBuff.Get() Self.clean() err = binary.Read(reader, binary.LittleEndian, &Self.Length) if err != nil { return } - Self.Content = pool.CopyBuff.Get() Self.Content = Self.Content[:Self.Length] //n, err := io.ReadFull(reader, Self.Content) //if n != int(Self.Length) { @@ -73,7 +88,7 @@ func (Self *BasePackager) marshal(content interface{}) (err error) { if err != nil { return err } - Self.Content = append(Self.Content, tmp...) + err = Self.appendByte(tmp) return } @@ -92,7 +107,7 @@ func (Self *BasePackager) setLength() { func (Self *BasePackager) clean() { Self.Length = 0 - Self.Content = Self.Content[:0] + Self.Content = Self.Content[:0] // reset length } func (Self *BasePackager) Split() (strList []string) { @@ -162,7 +177,6 @@ func (Self *MuxPackager) Pack(writer io.Writer) (err error) { } func (Self *MuxPackager) UnPack(reader io.Reader) (err error) { - Self.Length = 0 err = binary.Read(reader, binary.LittleEndian, &Self.Flag) if err != nil { return diff --git a/lib/common/util.go b/lib/common/util.go index 812c543..25cfee2 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -264,13 +264,19 @@ func GetPortByAddr(addr string) int { return p } -func CopyBuffer(dst io.Writer, src io.Reader,connId int32) (written int64, err error) { - buf := pool.GetBufPoolCopy() - defer pool.PutBufPoolCopy(buf) +func CopyBuffer(dst io.Writer, src io.Reader, connId int32) (written int64, err error) { + buf := pool.CopyBuff.Get() + defer pool.CopyBuff.Put(buf) for { nr, er := src.Read(buf) + if er != nil { + if er != io.EOF { + err = er + } + break + } if nr > 0 { - logs.Warn("write",connId, nr, string(buf[0:10])) + logs.Warn("write", connId, nr, string(buf[0:10])) nw, ew := dst.Write(buf[0:nr]) if nw > 0 { written += int64(nw) @@ -284,12 +290,6 @@ func CopyBuffer(dst io.Writer, src io.Reader,connId int32) (written int64, err e break } } - if er != nil { - if er != io.EOF { - err = er - } - break - } } return written, err } diff --git a/lib/mux/mux.go b/lib/mux/mux.go index 2073593..ad17cb0 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -228,7 +228,7 @@ func (s *Mux) Close() error { select { case s.closeChan <- struct{}{}: } - s.closeChan <- struct{}{} + //s.closeChan <- struct{}{} close(s.writeQueue) close(s.newConnCh) return s.conn.Close() diff --git a/lib/pool/pool.go b/lib/pool/pool.go index 26a91f5..0540a9d 100644 --- a/lib/pool/pool.go +++ b/lib/pool/pool.go @@ -66,13 +66,14 @@ type CopyBufferPool struct { func (Self *CopyBufferPool) New() { Self.pool = sync.Pool{ New: func() interface{} { - return make([]byte, PoolSizeCopy) + return make([]byte, PoolSizeCopy, PoolSizeCopy) }, } } func (Self *CopyBufferPool) Get() []byte { - return Self.pool.Get().([]byte) + buf := Self.pool.Get().([]byte) + return buf[:cap(buf)] // grow to capacity } func (Self *CopyBufferPool) Put(x []byte) {