From 23b023c562e582d3c52f1ade00168ab5e900dc0c Mon Sep 17 00:00:00 2001 From: ffdfgdfg Date: Mon, 21 Oct 2019 11:55:29 +0800 Subject: [PATCH] add lock free queue --- lib/common/util.go | 2 +- lib/mux/conn.go | 16 +- lib/mux/mux.go | 10 +- lib/mux/mux_test.go | 132 +++++++++++---- lib/mux/queue.go | 390 ++++++++++++++++++++++++++++++++++---------- 5 files changed, 419 insertions(+), 131 deletions(-) diff --git a/lib/common/util.go b/lib/common/util.go index e3dfb4f..1f54a6f 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -263,7 +263,7 @@ func GetPortByAddr(addr string) int { return p } -func CopyBuffer(dst io.Writer, src io.Reader) (written int64, err error) { +func CopyBuffer(dst io.Writer, src io.Reader, label ...string) (written int64, err error) { buf := CopyBuff.Get() defer CopyBuff.Put(buf) for { diff --git a/lib/mux/conn.go b/lib/mux/conn.go index dc3063c..3011732 100644 --- a/lib/mux/conn.go +++ b/lib/mux/conn.go @@ -2,10 +2,12 @@ package mux import ( "errors" + "github.com/astaxie/beego/logs" "io" "net" "strconv" "sync" + "sync/atomic" "time" "github.com/cnlh/nps/lib/common" @@ -65,7 +67,7 @@ func (s *conn) Read(buf []byte) (n int, err error) { errstr = err.Error() } d := getM(s.label, int(s.connId)) - d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr) + d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100])) setM(s.label, int(s.connId), d) return } @@ -187,11 +189,7 @@ func (Self *ReceiveWindow) RemainingSize() (n uint32) { func (Self *ReceiveWindow) ReadSize() (n uint32) { // acknowledge the size already read - Self.bufQueue.mutex.Lock() - n = Self.readLength - Self.readLength = 0 - Self.bufQueue.mutex.Unlock() - return + return atomic.SwapUint32(&Self.readLength, 0) } func (Self *ReceiveWindow) CalcSize() { @@ -270,10 +268,8 @@ copyData: //Self.bw.SetCopySize(l) pOff += l Self.off += uint32(l) - Self.bufQueue.mutex.Lock() - Self.readLength += uint32(l) + atomic.AddUint32(&Self.readLength, uint32(l)) //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len()) - Self.bufQueue.mutex.Unlock() n += l l = 0 //Self.bw.EndRead() @@ -422,6 +418,7 @@ func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) { if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE { sendSize = common.MAXIMUM_SEGMENT_SIZE part = true + logs.Warn("cut buf by mss") } else { sendSize = uint32(len(Self.buf[Self.off:])) part = false @@ -430,6 +427,7 @@ func (Self *SendWindow) WriteTo() (p []byte, part bool, err error) { // usable window size is small than // window MAXIMUM_SEGMENT_SIZE or send buf left sendSize = Self.RemainingSize() + logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) part = true } //logs.Warn("send size", sendSize) diff --git a/lib/mux/mux.go b/lib/mux/mux.go index c24c0bc..6f08641 100644 --- a/lib/mux/mux.go +++ b/lib/mux/mux.go @@ -34,6 +34,8 @@ type Mux struct { } func NewMux(c net.Conn, connType string) *Mux { + //c.(*net.TCPConn).SetReadBuffer(0) + //c.(*net.TCPConn).SetWriteBuffer(0) m := &Mux{ conn: c, connMap: NewConnMap(), @@ -173,10 +175,6 @@ func (s *Mux) ping() { select { case <-ticker.C: } - //Avoid going beyond the scope - if (math.MaxInt32 - s.id) < 10000 { - s.id = 0 - } now, _ := time.Now().UTC().MarshalText() s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) if !s.pingTimer.Stop() { @@ -321,6 +319,10 @@ func (s *Mux) Close() error { //get new connId as unique flag func (s *Mux) getId() (id int32) { + //Avoid going beyond the scope + if (math.MaxInt32 - s.id) < 10000 { + atomic.SwapInt32(&s.id, 0) + } id = atomic.AddInt32(&s.id, 1) if _, ok := s.connMap.Get(id); ok { s.getId() diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go index 1ef71f6..43e12e8 100644 --- a/lib/mux/mux_test.go +++ b/lib/mux/mux_test.go @@ -3,13 +3,16 @@ package mux import ( "bufio" "fmt" + "io" "net" "net/http" "net/http/httputil" _ "net/http/pprof" + "strconv" "sync" "testing" "time" + "unsafe" "github.com/astaxie/beego/logs" "github.com/cnlh/nps/lib/common" @@ -30,20 +33,22 @@ func TestNewMux(t *testing.T) { go func() { m2 := NewMux(conn2, "tcp") for { - logs.Warn("npc starting accept") + //logs.Warn("npc starting accept") c, err := m2.Accept() if err != nil { logs.Warn(err) continue } - logs.Warn("npc accept success ") + //logs.Warn("npc accept success ") c2, err := net.Dial("tcp", "127.0.0.1:80") if err != nil { logs.Warn(err) c.Close() continue } - go func(c2 net.Conn, c net.Conn) { + //c2.(*net.TCPConn).SetReadBuffer(0) + //c2.(*net.TCPConn).SetReadBuffer(0) + go func(c2 net.Conn, c *conn) { wg := sync.WaitGroup{} wg.Add(1) go func() { @@ -51,7 +56,7 @@ func TestNewMux(t *testing.T) { if err != nil { c2.Close() c.Close() - logs.Warn("close npc by copy from nps", err) + logs.Warn("close npc by copy from nps", err, c.connId) } wg.Done() }() @@ -61,13 +66,13 @@ func TestNewMux(t *testing.T) { if err != nil { c2.Close() c.Close() - logs.Warn("close npc by copy from server", err) + logs.Warn("close npc by copy from server", err, c.connId) } wg.Done() }() - logs.Warn("npc wait") + //logs.Warn("npc wait") wg.Wait() - }(c2, c) + }(c2, c.(*conn)) } }() @@ -78,42 +83,46 @@ func TestNewMux(t *testing.T) { logs.Warn(err) } for { - logs.Warn("nps starting accept") - conn, err := l.Accept() + //logs.Warn("nps starting accept") + conns, err := l.Accept() if err != nil { logs.Warn(err) continue } - logs.Warn("nps accept success starting new conn") + //conns.(*net.TCPConn).SetReadBuffer(0) + //conns.(*net.TCPConn).SetReadBuffer(0) + //logs.Warn("nps accept success starting new conn") tmpCpnn, err := m1.NewConn() if err != nil { logs.Warn("nps new conn err ", err) continue } logs.Warn("nps new conn success ", tmpCpnn.connId) - go func(tmpCpnn net.Conn, conn net.Conn) { + go func(tmpCpnn *conn, conns net.Conn) { go func() { - _, err := common.CopyBuffer(tmpCpnn, conn) + _, err := common.CopyBuffer(tmpCpnn, conns) if err != nil { - conn.Close() + conns.Close() tmpCpnn.Close() - logs.Warn("close nps by copy from user") + logs.Warn("close nps by copy from user", tmpCpnn.connId, err) } }() //time.Sleep(time.Second) - _, err = common.CopyBuffer(conn, tmpCpnn) + _, err = common.CopyBuffer(conns, tmpCpnn) if err != nil { - conn.Close() + conns.Close() tmpCpnn.Close() - logs.Warn("close nps by copy from npc ") + logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) } - }(tmpCpnn, conn) + }(tmpCpnn, conns) } }() go NewLogServer() time.Sleep(time.Second * 5) - //go test_request() + //for i:=0;i<1000;i++ { + // go test_raw(i) + //} for { time.Sleep(time.Second * 5) @@ -168,23 +177,40 @@ Connection: keep-alive } } -func test_raw() { - conn, _ := net.Dial("tcp", "127.0.0.1:7777") - for { - conn.Write([]byte(`GET /videojs5/test HTTP/1.1 +func test_raw(k int) { + for i := 0; i < 1; i++ { + ti := time.Now() + conn, _ := net.Dial("tcp", "127.0.0.1:7777") + tid := time.Now() + conn.Write([]byte(`GET / HTTP/1.1 Host: 127.0.0.1:7777 -Connection: keep-alive `)) - buf := make([]byte, 1000000) - n, err := conn.Read(buf) + tiw := time.Now() + buf := make([]byte, 3572) + n, err := io.ReadFull(conn, buf) + //n, err := conn.Read(buf) if err != nil { logs.Warn("close by read response err", err) break } - logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) - time.Sleep(time.Second) + //logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) + //time.Sleep(time.Second) + err = conn.Close() + if err != nil { + logs.Warn("close conn err ", err) + } + now := time.Now() + du := now.Sub(ti).Seconds() + dud := now.Sub(tid).Seconds() + duw := now.Sub(tiw).Seconds() + if du > 1 { + logs.Warn("duration long", du, dud, duw, k, i) + } + if n != 3572 { + logs.Warn("n loss", n, string(buf)) + } } } @@ -199,3 +225,53 @@ func TestNewConn(t *testing.T) { logs.Warn(copy(buf[:3], b), len(buf), cap(buf)) logs.Warn(len(buf), buf[0]) } + +func TestDQueue(t *testing.T) { + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + d := new(bufDequeue) + d.vals = make([]unsafe.Pointer, 8) + go func() { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + logs.Warn(i) + logs.Warn(d.popTail()) + } + }() + go func() { + time.Sleep(time.Second) + for i := 0; i < 10; i++ { + data := "test" + go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data))) + } + }() + time.Sleep(time.Second * 3) +} + +func TestChain(t *testing.T) { + logs.EnableFuncCallDepth(true) + logs.SetLogFuncCallDepth(3) + d := new(bufChain) + d.new(256) + go func() { + time.Sleep(time.Second) + for i := 0; i < 1000; i++ { + unsa, ok := d.popTail() + str := (*string)(unsa) + if ok { + logs.Warn(i, str, *str, ok) + } else { + logs.Warn("nil", i, ok) + } + } + }() + go func() { + time.Sleep(time.Second) + for i := 0; i < 1000; i++ { + data := "test " + strconv.Itoa(i) + logs.Warn(data, unsafe.Pointer(&data)) + go d.pushHead(unsafe.Pointer(&data)) + } + }() + time.Sleep(time.Second * 10) +} diff --git a/lib/mux/queue.go b/lib/mux/queue.go index a835e2a..ef0c904 100644 --- a/lib/mux/queue.go +++ b/lib/mux/queue.go @@ -1,19 +1,19 @@ package mux import ( - "container/list" "errors" "github.com/cnlh/nps/lib/common" "io" - "sync" + "math" + "sync/atomic" "time" + "unsafe" ) type QueueOp struct { readOp chan struct{} cleanOp chan struct{} - popWait bool - mutex sync.Mutex + popWait int32 } func (Self *QueueOp) New() { @@ -22,15 +22,15 @@ func (Self *QueueOp) New() { } func (Self *QueueOp) allowPop() (closed bool) { - Self.mutex.Lock() - Self.popWait = false - Self.mutex.Unlock() - select { - case Self.readOp <- struct{}{}: - return false - case <-Self.cleanOp: - return true + if atomic.CompareAndSwapInt32(&Self.popWait, 1, 0) { + select { + case Self.readOp <- struct{}{}: + return false + case <-Self.cleanOp: + return true + } } + return } func (Self *QueueOp) Clean() { @@ -40,84 +40,72 @@ func (Self *QueueOp) Clean() { } type PriorityQueue struct { - list *list.List QueueOp + highestChain *bufChain + middleChain *bufChain + lowestChain *bufChain + hunger uint8 } func (Self *PriorityQueue) New() { - Self.list = list.New() + Self.highestChain = new(bufChain) + Self.highestChain.new(4) + Self.middleChain = new(bufChain) + Self.middleChain.new(32) + Self.lowestChain = new(bufChain) + Self.lowestChain.new(256) Self.QueueOp.New() } func (Self *PriorityQueue) Push(packager *common.MuxPackager) { - Self.mutex.Lock() switch packager.Flag { case common.MUX_PING_FLAG, common.MUX_PING_RETURN: - Self.list.PushFront(packager) + Self.highestChain.pushHead(unsafe.Pointer(packager)) // the ping package need highest priority // prevent ping calculation error - case common.MUX_CONN_CLOSE: - Self.insert(packager) - // the close package may need priority too, set second - // prevent wait too long to close conn + case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail: + // the new conn package need some priority too + Self.middleChain.pushHead(unsafe.Pointer(packager)) default: - Self.list.PushBack(packager) + Self.lowestChain.pushHead(unsafe.Pointer(packager)) } - if Self.popWait { - Self.mutex.Unlock() - Self.allowPop() - return - } - Self.mutex.Unlock() + Self.allowPop() return } -func (Self *PriorityQueue) insert(packager *common.MuxPackager) { - element := Self.list.Back() - for { - if element == nil { // PriorityQueue dose not have any of msg package with this close package id - element = Self.list.Front() - if element != nil { - Self.list.InsertAfter(packager, element) - // insert close package to second - } else { - Self.list.PushFront(packager) - // list is empty, push to front - } - break - } - if element.Value.(*common.MuxPackager).Flag == common.MUX_NEW_MSG && - element.Value.(*common.MuxPackager).Id == packager.Id { - Self.list.InsertAfter(packager, element) // PriorityQueue has some msg package - // with this close package id, insert close package after last msg package - break - } - element = element.Prev() - } -} - func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { - Self.mutex.Lock() - element := Self.list.Front() - if element != nil { - packager = element.Value.(*common.MuxPackager) - Self.list.Remove(element) - Self.mutex.Unlock() +startPop: + ptr, ok := Self.highestChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) return } - Self.popWait = true // PriorityQueue is empty, notice Push method - Self.mutex.Unlock() - select { - case <-Self.readOp: - return Self.Pop() - case <-Self.cleanOp: - return nil + if Self.hunger < 100 { + ptr, ok = Self.middleChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + Self.hunger++ + return + } } -} - -func (Self *PriorityQueue) Len() (n int) { - n = Self.list.Len() - return + ptr, ok = Self.lowestChain.popTail() + if ok { + packager = (*common.MuxPackager)(ptr) + if Self.hunger > 0 { + Self.hunger = uint8(Self.hunger / 2) + } + return + } + // PriorityQueue is empty, notice Push method + if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) { + select { + case <-Self.readOp: + goto startPop + case <-Self.cleanOp: + return nil + } + } + goto startPop } type ListElement struct { @@ -137,36 +125,36 @@ func (Self *ListElement) New(buf []byte, l uint16, part bool) (err error) { } type FIFOQueue struct { - list []*ListElement + QueueOp + chain *bufChain length uint32 stopOp chan struct{} timeout time.Time - QueueOp } func (Self *FIFOQueue) New() { Self.QueueOp.New() + Self.chain = new(bufChain) + Self.chain.new(64) Self.stopOp = make(chan struct{}, 1) } func (Self *FIFOQueue) Push(element *ListElement) { - Self.mutex.Lock() - Self.list = append(Self.list, element) + Self.chain.pushHead(unsafe.Pointer(element)) Self.length += uint32(element.l) - if Self.popWait { - Self.mutex.Unlock() - Self.allowPop() - return - } - Self.mutex.Unlock() + Self.allowPop() return } func (Self *FIFOQueue) Pop() (element *ListElement, err error) { - Self.mutex.Lock() - if len(Self.list) == 0 { - Self.popWait = true - Self.mutex.Unlock() +startPop: + ptr, ok := Self.chain.popTail() + if ok { + element = (*ListElement)(ptr) + Self.length -= uint32(element.l) + return + } + if atomic.CompareAndSwapInt32(&Self.popWait, 0, 1) { t := Self.timeout.Sub(time.Now()) if t <= 0 { t = time.Minute @@ -175,7 +163,7 @@ func (Self *FIFOQueue) Pop() (element *ListElement, err error) { defer timer.Stop() select { case <-Self.readOp: - Self.mutex.Lock() + goto startPop case <-Self.cleanOp: return case <-Self.stopOp: @@ -186,11 +174,7 @@ func (Self *FIFOQueue) Pop() (element *ListElement, err error) { return } } - element = Self.list[0] - Self.list = Self.list[1:] - Self.length -= uint32(element.l) - Self.mutex.Unlock() - return + goto startPop } func (Self *FIFOQueue) Len() (n uint32) { @@ -204,3 +188,231 @@ func (Self *FIFOQueue) Stop() { func (Self *FIFOQueue) SetTimeOut(t time.Time) { Self.timeout = t } + +// https://golang.org/src/sync/poolqueue.go + +type bufDequeue struct { + // headTail packs together a 32-bit head index and a 32-bit + // tail index. Both are indexes into vals modulo len(vals)-1. + // + // tail = index of oldest data in queue + // head = index of next slot to fill + // + // Slots in the range [tail, head) are owned by consumers. + // A consumer continues to own a slot outside this range until + // it nils the slot, at which point ownership passes to the + // producer. + // + // The head index is stored in the most-significant bits so + // that we can atomically add to it and the overflow is + // harmless. + headTail uint64 + + // vals is a ring buffer of interface{} values stored in this + // dequeue. The size of this must be a power of 2. + // + // A slot is still in use until *both* the tail + // index has moved beyond it and typ has been set to nil. This + // is set to nil atomically by the consumer and read + // atomically by the producer. + vals []unsafe.Pointer +} + +const dequeueBits = 32 + +// dequeueLimit is the maximum size of a bufDequeue. +// +// This must be at most (1<> dequeueBits) & mask) + tail = uint32(ptrs & mask) + return +} + +func (d *bufDequeue) pack(head, tail uint32) uint64 { + const mask = 1<= dequeueLimit { + // Can't make it any bigger. + newSize = dequeueLimit + } + + d2 := &bufChainElt{prev: d} + d2.vals = make([]unsafe.Pointer, newSize) + storePoolChainElt(&c.head, d2) + storePoolChainElt(&d.next, d2) + d2.pushHead(val) + atomic.SwapInt32(&c.chainStatus, 0) + } + } +} + +func (c *bufChain) popTail() (unsafe.Pointer, bool) { + d := loadPoolChainElt(&c.tail) + if d == nil { + return nil, false + } + + for { + // It's important that we load the next pointer + // *before* popping the tail. In general, d may be + // transiently empty, but if next is non-nil before + // the pop and the pop fails, then d is permanently + // empty, which is the only condition under which it's + // safe to drop d from the chain. + d2 := loadPoolChainElt(&d.next) + + if val, ok := d.popTail(); ok { + return val, ok + } + + if d2 == nil { + // This is the only dequeue. It's empty right + // now, but could be pushed to in the future. + return nil, false + } + + // The tail of the chain has been drained, so move on + // to the next dequeue. Try to drop it from the chain + // so the next pop doesn't have to look at the empty + // dequeue again. + if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { + // We won the race. Clear the prev pointer so + // the garbage collector can collect the empty + // dequeue and so popHead doesn't back up + // further than necessary. + storePoolChainElt(&d2.prev, nil) + } + d = d2 + } +}