diff --git a/bridge/bridge.go b/bridge/bridge.go index 0827260..6f93aff 100755 --- a/bridge/bridge.go +++ b/bridge/bridge.go @@ -1,6 +1,7 @@ package bridge import ( + "ehang.io/nps-mux" "encoding/binary" "errors" "fmt" @@ -15,7 +16,6 @@ import ( "ehang.io/nps/lib/conn" "ehang.io/nps/lib/crypt" "ehang.io/nps/lib/file" - "ehang.io/nps/lib/mux" "ehang.io/nps/lib/version" "ehang.io/nps/server/connection" "ehang.io/nps/server/tool" @@ -24,14 +24,14 @@ import ( ) type Client struct { - tunnel *mux.Mux + tunnel *nps_mux.Mux signal *conn.Conn - file *mux.Mux + file *nps_mux.Mux Version string retryTime int // it will be add 1 when ping not ok until to 3 will close the client } -func NewClient(t, f *mux.Mux, s *conn.Conn, vs string) *Client { +func NewClient(t, f *nps_mux.Mux, s *conn.Conn, vs string) *Client { return &Client{ signal: s, tunnel: t, @@ -50,10 +50,10 @@ type Bridge struct { CloseClient chan int SecretChan chan *conn.Secret ipVerify bool - runList map[int]interface{} + runList sync.Map //map[int]interface{} } -func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList map[int]interface{}) *Bridge { +func NewTunnel(tunnelPort int, tunnelType string, ipVerify bool, runList sync.Map) *Bridge { return &Bridge{ TunnelPort: tunnelPort, tunnelType: tunnelType, @@ -242,7 +242,7 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int, vs string) { go s.GetHealthFromClient(id, c) logs.Info("clientId %d connection succeeded, address:%s ", id, c.Conn.RemoteAddr()) case common.WORK_CHAN: - muxConn := mux.NewMux(c.Conn, s.tunnelType) + muxConn := nps_mux.NewMux(c.Conn, s.tunnelType) if v, ok := s.Client.LoadOrStore(id, NewClient(muxConn, nil, nil, vs)); ok { v.(*Client).tunnel = muxConn } @@ -263,7 +263,7 @@ func (s *Bridge) typeDeal(typeVal string, c *conn.Conn, id int, vs string) { logs.Error("secret error, failed to match the key successfully") } case common.WORK_FILE: - muxConn := mux.NewMux(c.Conn, s.tunnelType) + muxConn := nps_mux.NewMux(c.Conn, s.tunnelType) if v, ok := s.Client.LoadOrStore(id, NewClient(nil, muxConn, nil, vs)); ok { v.(*Client).file = muxConn } @@ -321,7 +321,7 @@ func (s *Bridge) SendLinkInfo(clientId int, link *conn.Link, t *file.Tunnel) (ta } } } - var tunnel *mux.Mux + var tunnel *nps_mux.Mux if t != nil && t.Mode == "file" { tunnel = v.(*Client).file } else { @@ -407,7 +407,8 @@ loop: }) file.GetDb().JsonDb.Tasks.Range(func(key, value interface{}) bool { v := value.(*file.Tunnel) - if _, ok := s.runList[v.Id]; ok && v.Client.Id == id { + //if _, ok := s.runList[v.Id]; ok && v.Client.Id == id { + if _, ok := s.runList.Load(v.Id); ok && v.Client.Id == id { str += v.Remark + common.CONN_DATA_SEQ } return true diff --git a/build.sh b/build.sh index 6b236c4..afd4e1e 100755 --- a/build.sh +++ b/build.sh @@ -1,5 +1,5 @@ #/bash/sh -export VERSION=0.26.0 +export VERSION=0.26.1 sudo apt-get install gcc-mingw-w64-i686 env GOOS=windows GOARCH=386 CGO_ENABLED=1 CC=i686-w64-mingw32-gcc go build -ldflags "-s -w -extldflags -static -extldflags -static" -buildmode=c-shared -o npc_sdk.dll cmd/npc/sdk.go diff --git a/client/client.go b/client/client.go index 98c918d..99801f2 100755 --- a/client/client.go +++ b/client/client.go @@ -3,6 +3,7 @@ package client import ( "bufio" "bytes" + "ehang.io/nps-mux" "net" "net/http" "strconv" @@ -15,7 +16,6 @@ import ( "ehang.io/nps/lib/config" "ehang.io/nps/lib/conn" "ehang.io/nps/lib/crypt" - "ehang.io/nps/lib/mux" ) type TRPClient struct { @@ -24,7 +24,7 @@ type TRPClient struct { proxyUrl string vKey string p2pAddr map[string]string - tunnel *mux.Mux + tunnel *nps_mux.Mux signal *conn.Conn ticker *time.Ticker cnf *config.Config @@ -138,7 +138,7 @@ func (s *TRPClient) newUdpConn(localAddr, rAddr string, md5Password string) { conn.SetUdpSession(udpTunnel) logs.Trace("successful connection with client ,address %s", udpTunnel.RemoteAddr().String()) //read link info from remote - conn.Accept(mux.NewMux(udpTunnel, s.bridgeConnType), func(c net.Conn) { + conn.Accept(nps_mux.NewMux(udpTunnel, s.bridgeConnType), func(c net.Conn) { go s.handleChan(c) }) break @@ -146,14 +146,14 @@ func (s *TRPClient) newUdpConn(localAddr, rAddr string, md5Password string) { } } -//mux tunnel +//pmux tunnel func (s *TRPClient) newChan() { tunnel, err := NewConn(s.bridgeConnType, s.vKey, s.svrAddr, common.WORK_CHAN, s.proxyUrl) if err != nil { logs.Error("connect to ", s.svrAddr, "error:", err) return } - s.tunnel = mux.NewMux(tunnel.Conn, s.bridgeConnType) + s.tunnel = nps_mux.NewMux(tunnel.Conn, s.bridgeConnType) for { src, err := s.tunnel.Accept() if err != nil { diff --git a/client/local.go b/client/local.go index 13dfb60..b5b1319 100644 --- a/client/local.go +++ b/client/local.go @@ -1,6 +1,7 @@ package client import ( + "ehang.io/nps-mux" "errors" "net" "net/http" @@ -13,7 +14,6 @@ import ( "ehang.io/nps/lib/conn" "ehang.io/nps/lib/crypt" "ehang.io/nps/lib/file" - "ehang.io/nps/lib/mux" "ehang.io/nps/server/proxy" "github.com/astaxie/beego/logs" "github.com/xtaci/kcp-go" @@ -22,7 +22,7 @@ import ( var ( LocalServer []*net.TCPListener udpConn net.Conn - muxSession *mux.Mux + muxSession *nps_mux.Mux fileServer []*http.Server p2pNetBridge *p2pBridge lock sync.RWMutex @@ -73,7 +73,7 @@ func startLocalFileServer(config *config.CommonConfig, t *file.Tunnel, vkey stri } logs.Info("start local file system, local path %s, strip prefix %s ,remote port %s ", t.LocalPath, t.StripPre, t.Ports) fileServer = append(fileServer, srv) - listener := mux.NewMux(remoteConn.Conn, common.CONN_TCP) + listener := nps_mux.NewMux(remoteConn.Conn, common.CONN_TCP) logs.Error(srv.Serve(listener)) } @@ -214,6 +214,6 @@ func newUdpConn(localAddr string, config *config.CommonConfig, l *config.LocalSe logs.Trace("successful create a connection with server", remoteAddress) conn.SetUdpSession(udpTunnel) udpConn = udpTunnel - muxSession = mux.NewMux(udpConn, "kcp") + muxSession = nps_mux.NewMux(udpConn, "kcp") p2pNetBridge = &p2pBridge{} } diff --git a/cmd/npc/npc.go b/cmd/npc/npc.go index cb2efba..309ca68 100644 --- a/cmd/npc/npc.go +++ b/cmd/npc/npc.go @@ -35,6 +35,31 @@ var ( debug = flag.Bool("debug", true, "npc debug") ) +const systemdScript = `[Unit] +Description={{.Description}} +ConditionFileIsExecutable={{.Path|cmdEscape}} +{{range $i, $dep := .Dependencies}} +{{$dep}} {{end}} +[Service] +LimitNOFILE=65536 +StartLimitInterval=5 +StartLimitBurst=10 +ExecStart={{.Path|cmdEscape}}{{range .Arguments}} {{.|cmd}}{{end}} +{{if .ChRoot}}RootDirectory={{.ChRoot|cmd}}{{end}} +{{if .WorkingDirectory}}WorkingDirectory={{.WorkingDirectory|cmdEscape}}{{end}} +{{if .UserName}}User={{.UserName}}{{end}} +{{if .ReloadSignal}}ExecReload=/bin/kill -{{.ReloadSignal}} "$MAINPID"{{end}} +{{if .PIDFile}}PIDFile={{.PIDFile|cmd}}{{end}} +{{if and .LogOutput .HasOutputFileSupport -}} +StandardOutput=file:/var/log/{{.Name}}.out +StandardError=file:/var/log/{{.Name}}.err +{{- end}} +Restart=always +RestartSec=120 +[Install] +WantedBy=multi-user.target +` + func main() { flag.Parse() logs.Reset() @@ -54,8 +79,6 @@ func main() { // init service options := make(service.KeyValue) - options["Restart"] = "on-success" - options["SuccessExitStatus"] = "1 2 8 SIGKILL" svcConfig := &service.Config{ Name: "Npc", DisplayName: "nps内网穿透客户端", @@ -66,6 +89,7 @@ func main() { svcConfig.Dependencies = []string{ "Requires=network.target", "After=network-online.target syslog.target"} + svcConfig.Option["SystemdScript"] = systemdScript } for _, v := range os.Args[1:] { switch v { diff --git a/cmd/nps/nps.go b/cmd/nps/nps.go index 18ea025..e74f344 100644 --- a/cmd/nps/nps.go +++ b/cmd/nps/nps.go @@ -28,6 +28,31 @@ var ( level string ) +const systemdScript = `[Unit] +Description={{.Description}} +ConditionFileIsExecutable={{.Path|cmdEscape}} +{{range $i, $dep := .Dependencies}} +{{$dep}} {{end}} +[Service] +LimitNOFILE=65536 +StartLimitInterval=5 +StartLimitBurst=10 +ExecStart={{.Path|cmdEscape}}{{range .Arguments}} {{.|cmd}}{{end}} +{{if .ChRoot}}RootDirectory={{.ChRoot|cmd}}{{end}} +{{if .WorkingDirectory}}WorkingDirectory={{.WorkingDirectory|cmdEscape}}{{end}} +{{if .UserName}}User={{.UserName}}{{end}} +{{if .ReloadSignal}}ExecReload=/bin/kill -{{.ReloadSignal}} "$MAINPID"{{end}} +{{if .PIDFile}}PIDFile={{.PIDFile|cmd}}{{end}} +{{if and .LogOutput .HasOutputFileSupport -}} +StandardOutput=file:/var/log/{{.Name}}.out +StandardError=file:/var/log/{{.Name}}.err +{{- end}} +Restart=always +RestartSec=120 +[Install] +WantedBy=multi-user.target +` + func main() { flag.Parse() // init log @@ -49,8 +74,6 @@ func main() { } // init service options := make(service.KeyValue) - options["Restart"] = "on-success" - options["SuccessExitStatus"] = "1 2 8 SIGKILL" svcConfig := &service.Config{ Name: "Nps", DisplayName: "nps内网穿透代理服务器", @@ -59,14 +82,15 @@ func main() { } svcConfig.Arguments = append(svcConfig.Arguments, "service") if len(os.Args) > 1 && os.Args[1] == "service" { - logs.SetLogger(logs.AdapterFile, `{"level":`+level+`,"filename":"`+logPath+`","daily":false,"maxlines":100000,"color":true}`) + _ = logs.SetLogger(logs.AdapterFile, `{"level":`+level+`,"filename":"`+logPath+`","daily":false,"maxlines":100000,"color":true}`) } else { - logs.SetLogger(logs.AdapterConsole, `{"level":`+level+`,"color":true}`) + _ = logs.SetLogger(logs.AdapterConsole, `{"level":`+level+`,"color":true}`) } if !common.IsWindows() { svcConfig.Dependencies = []string{ "Requires=network.target", "After=network-online.target syslog.target"} + svcConfig.Option["SystemdScript"] = systemdScript } prg := &nps{} prg.exit = make(chan struct{}) @@ -82,8 +106,8 @@ func main() { return case "install": // uninstall before - service.Control(s, "stop") - service.Control(s, "uninstall") + _ = service.Control(s, "stop") + _ = service.Control(s, "uninstall") binPath := install.InstallNps() svcConfig.Executable = binPath @@ -111,7 +135,7 @@ func main() { return } } - s.Run() + _ = s.Run() } type nps struct { @@ -119,10 +143,12 @@ type nps struct { } func (p *nps) Start(s service.Service) error { - p.run() + _, _ = s.Status() + _ = p.run() return nil } func (p *nps) Stop(s service.Service) error { + _, _ = s.Status() close(p.exit) if service.Interactive() { os.Exit(0) diff --git a/conf/nps.conf b/conf/nps.conf index 01f3e27..cee576f 100755 --- a/conf/nps.conf +++ b/conf/nps.conf @@ -49,7 +49,8 @@ web_key_file=conf/server.key #web_base_url=/nps #Web API unauthenticated IP address(the len of auth_crypt_key must be 16) -auth_key=test +#Remove comments if needed +#auth_key=test auth_crypt_key =1234567812345678 #allow_ports=9001-9009,10001,11000-12000 @@ -73,4 +74,5 @@ system_info_display=false http_cache=false http_cache_length=100 - +#get origin ip +http_add_origin_header=false diff --git a/docs/_coverpage.md b/docs/_coverpage.md index 9bcc15a..8936cfb 100644 --- a/docs/_coverpage.md +++ b/docs/_coverpage.md @@ -1,6 +1,6 @@ ![logo](logo.svg) -# NPS 0.26.0 +# NPS 0.26.1 > 一款轻量级、高性能、功能强大的内网穿透代理服务器 diff --git a/docs/api.md b/docs/api.md index 0357838..f4c6b9a 100644 --- a/docs/api.md +++ b/docs/api.md @@ -1,4 +1,6 @@ # web api + +需要开启请先去掉`nps.conf`中`auth_key`的注释并配置一个合适的密钥 ## webAPI验证说明 - 采用auth_key的验证方式 - 在提交的每个请求后面附带两个参数,`auth_key` 和`timestamp` diff --git a/docs/description.md b/docs/description.md index 72e69b3..6b44fa1 100644 --- a/docs/description.md +++ b/docs/description.md @@ -1,5 +1,6 @@ # 说明 ## 获取用户真实ip +如需使用需要在`nps.conf`中设置`http_add_origin_header=true` 在域名代理模式中,可以通过request请求 header 中的 X-Forwarded-For 和 X-Real-IP 来获取用户真实 IP。 @@ -8,9 +9,6 @@ ## 热更新支持 对于绝大多数配置,在web管理中的修改将实时使用,无需重启客户端或者服务端 -## web端保护 -在一分钟内,如果密码错误次数超过10次,该ip在一分钟内将不能再次登陆。 - ## 客户端地址显示 在web管理中将显示客户端的连接地址 diff --git a/docs/feature.md b/docs/feature.md index b223f76..c3252b9 100644 --- a/docs/feature.md +++ b/docs/feature.md @@ -89,7 +89,7 @@ target_ip=10.1.50.2 ``` ## KCP协议支持 -KCP 是一个快速可靠协议,能以比 TCP浪费10%-20%的带宽的代价,换取平均延迟降低 30%-40%,在弱网环境下对性能能有一定的提升。可在nps.conf中修改`bridge_type`为kcp +在网络质量非常好的情况下,例如专线,内网,可以开启略微降低延迟。如需使用可在nps.conf中修改`bridge_type`为kcp ,设置后本代理将开启udp端口(`bridge_port`) 注意:当服务端为kcp时,客户端连接时也需要使用相同配置,无配置文件模式加上参数type=kcp,配置文件模式在配置文件中设置tp=kcp diff --git a/go.mod b/go.mod index cad2acc..6938d34 100644 --- a/go.mod +++ b/go.mod @@ -3,10 +3,11 @@ module ehang.io/nps go 1.13 require ( + ehang.io/nps-mux v0.0.0-20200116160632-de59baca47b5 fyne.io/fyne v1.2.0 github.com/StackExchange/wmi v0.0.0-20190523213315-cbe66965904d // indirect github.com/astaxie/beego v1.12.0 - github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c // indirect + github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 // indirect github.com/c4milo/unpackit v0.0.0-20170704181138-4ed373e9ef1c github.com/ccding/go-stun v0.0.0-20180726100737-be486d185f3d github.com/dsnet/compress v0.0.1 // indirect @@ -14,20 +15,15 @@ require ( github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db github.com/hooklift/assert v0.0.0-20170704181755-9d1defd6d214 // indirect github.com/kardianos/service v1.0.0 - github.com/klauspost/cpuid v1.2.1 // indirect github.com/klauspost/pgzip v1.2.1 // indirect - github.com/klauspost/reedsolomon v1.9.2 // indirect github.com/panjf2000/ants/v2 v2.2.2 - github.com/pkg/errors v0.8.1 + github.com/pkg/errors v0.9.1 github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 // indirect github.com/shirou/gopsutil v2.19.11+incompatible - github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 // indirect - github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b // indirect - github.com/tjfoc/gmsm v1.0.1 // indirect - github.com/xtaci/kcp-go v5.4.4+incompatible - github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae // indirect - golang.org/x/net v0.0.0-20181220203305-927f97764cc3 - golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa // indirect + github.com/xtaci/kcp-go v5.4.20+incompatible + golang.org/x/crypto v0.0.0-20200117160349-530e935923ad // indirect + golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa + golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c // indirect ) replace github.com/astaxie/beego => github.com/exfly/beego v1.12.0-export-init diff --git a/go.sum b/go.sum index a1c8f67..f72e449 100644 --- a/go.sum +++ b/go.sum @@ -1,3 +1,5 @@ +ehang.io/nps-mux v0.0.0-20200116160632-de59baca47b5 h1:gbYMN0t1mroAtodN9t7rFRqAYtBGQpqjPNaJ/zFGmD8= +ehang.io/nps-mux v0.0.0-20200116160632-de59baca47b5/go.mod h1:v2gdtoMBRGYe5y9mSBwPw6V4V/2Zz5GyTuCNlsUPHkY= fyne.io/fyne v1.2.0 h1:mdp7Cs7QmSJTeazYxEDa9wWeJNig7paBcjm0dooFtLE= fyne.io/fyne v1.2.0/go.mod h1:Ab+3DIB/FVteW0y4DXfmZv4N3JdnCBh2lHkINI02BOU= github.com/Knetic/govaluate v3.0.0+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0= @@ -9,8 +11,8 @@ github.com/akavel/rsrc v0.8.0/go.mod h1:uLoCtb9J+EyAqh+26kdrTgmzRBFPGOolLWKpdxkK github.com/beego/goyaml2 v0.0.0-20130207012346-5545475820dd/go.mod h1:1b+Y/CofkYwXMUU0OhQqGvsY2Bvgr4j6jfT699wyZKQ= github.com/beego/x2j v0.0.0-20131220205130-a0352aadc542/go.mod h1:kSeGC/p1AbBiEp5kat81+DSQrZenVBZXklMLaELspWU= github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737/go.mod h1:PmM6Mmwb0LSuEubjR8N7PtNe1KxZLtOUHtbeikc5h60= -github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c h1:FUUopH4brHNO2kJoNN3pV+OBEYmgraLT/KHZrMM69r0= -github.com/bradfitz/iter v0.0.0-20190303215204-33e6a9893b0c/go.mod h1:PyRFw1Lt2wKX4ZVSQ2mk+PeDa1rxyObEDlApuIsUKuo= +github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8 h1:GKTyiRCL6zVf5wWaqKnf+7Qs6GbEPfd4iMOitWzXJx8= +github.com/bradfitz/iter v0.0.0-20191230175014-e8f45d346db8/go.mod h1:spo1JLcs67NmW1aVLEgtA8Yy1elc+X8y5SRW1sFW4Og= github.com/c4milo/unpackit v0.0.0-20170704181138-4ed373e9ef1c h1:aprLqMn7gSPT+vdDSl+/E6NLEuArwD/J7IWd8bJt5lQ= github.com/c4milo/unpackit v0.0.0-20170704181138-4ed373e9ef1c/go.mod h1:Ie6SubJv/NTO9Q0UBH0QCl3Ve50lu9hjbi5YJUw03TE= github.com/casbin/casbin v1.7.0/go.mod h1:c67qKN6Oum3UF5Q1+BByfFxkwKvhwW57ITjqwtzR1KE= @@ -56,12 +58,12 @@ github.com/kardianos/service v1.0.0/go.mod h1:8CzDhVuCuugtsHyZoTvsOBuvonN/UDBvl0 github.com/klauspost/compress v1.4.1 h1:8VMb5+0wMgdBykOV96DwNwKFQ+WTI4pzYURP99CcB9E= github.com/klauspost/compress v1.4.1/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/klauspost/cpuid v1.2.0/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= -github.com/klauspost/cpuid v1.2.1 h1:vJi+O/nMdFt0vqm8NZBI6wzALWdA2X+egi0ogNyrC/w= -github.com/klauspost/cpuid v1.2.1/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= +github.com/klauspost/cpuid v1.2.2 h1:1xAgYebNnsb9LKCdLOvFWtAxGU/33mjJtyOVbmUa0Us= +github.com/klauspost/cpuid v1.2.2/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/pgzip v1.2.1 h1:oIPZROsWuPHpOdMVWLuJZXwgjhrW8r1yEX8UqMyeNHM= github.com/klauspost/pgzip v1.2.1/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= -github.com/klauspost/reedsolomon v1.9.2 h1:E9CMS2Pqbv+C7tsrYad4YC9MfhnMVWhMRsTi7U0UB18= -github.com/klauspost/reedsolomon v1.9.2/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= +github.com/klauspost/reedsolomon v1.9.3 h1:N/VzgeMfHmLc+KHMD1UL/tNkfXAt8FnUqlgXGIduwAY= +github.com/klauspost/reedsolomon v1.9.3/go.mod h1:CwCi+NUr9pqSVktrkN+Ondf06rkhYZ/pcNv7fu+8Un4= github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= github.com/mattn/go-sqlite3 v1.10.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= github.com/nfnt/resize v0.0.0-20180221191011-83c6a9932646 h1:zYyBkD/k9seD2A7fsi6Oo2LfFZAehjjQMERAvZLEDnQ= @@ -73,6 +75,8 @@ github.com/pkg/errors v0.8.0 h1:WdK/asTD0HN+q6hsWO3/vpuAkAr+tw6aNJNDFFf0+qw= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= +github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= +github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/shiena/ansicolor v0.0.0-20151119151921-a422bbe96644 h1:X+yvsM2yrEktyI+b2qND5gpH8YhURn0k8OCaeRnkINo= @@ -95,19 +99,25 @@ github.com/stretchr/testify v1.3.1-0.20190311161405-34c6fa2dc709/go.mod h1:M5WIy github.com/syndtr/goleveldb v0.0.0-20181127023241-353a9fca669c/go.mod h1:Z4AUp2Km+PwemOoO/VB5AOx9XSsIItzFjoJlOSiYmn0= github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161 h1:89CEmDvlq/F7SJEOqkIdNDGJXrQIhuIx9D2DBXjavSU= github.com/templexxx/cpufeat v0.0.0-20180724012125-cef66df7f161/go.mod h1:wM7WEvslTq+iOEAMDLSzhVuOt5BRZ05WirO+b09GHQU= -github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b h1:mnG1fcsIB1d/3vbkBak2MM0u+vhGhlQwpeimUi7QncM= -github.com/templexxx/xor v0.0.0-20181023030647-4e92f724b73b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4= -github.com/tjfoc/gmsm v1.0.1 h1:R11HlqhXkDospckjZEihx9SW/2VW0RgdwrykyWMFOQU= -github.com/tjfoc/gmsm v1.0.1/go.mod h1:XxO4hdhhrzAd+G4CjDqaOkd0hUzmtPR/d3EiBBMn/wc= +github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b h1:fj5tQ8acgNUr6O8LEplsxDhUIe2573iLkJc+PqnzZTI= +github.com/templexxx/xor v0.0.0-20191217153810-f85b25db303b/go.mod h1:5XA7W9S6mni3h5uvOC75dA3m9CCCaS83lltmc0ukdi4= +github.com/tjfoc/gmsm v1.2.0 h1:oTXUFetR8GphwGmUUxWFxrRZJTaDcZo1Lt2mRxlVzEI= +github.com/tjfoc/gmsm v1.2.0/go.mod h1:HaUcFuY0auTiaHB9MHFGCPx5IaLhTUd2atbCFBQXn9w= github.com/ulikunitz/xz v0.5.6 h1:jGHAfXawEGZQ3blwU5wnWKQJvAraT7Ftq9EXjnXYgt8= github.com/ulikunitz/xz v0.5.6/go.mod h1:2bypXElzHzzJZwzH67Y6wb67pO62Rzfn7BSiF4ABRW8= github.com/wendal/errors v0.0.0-20130201093226-f66c77a7882b/go.mod h1:Q12BUT7DqIlHRmgv3RskH+UCM/4eqVMgI0EMmlSpAXc= github.com/xtaci/kcp-go v5.4.4+incompatible h1:QIJ0a0Q0N1G20yLHL2+fpdzyy2v/Cb3PI+xiwx/KK9c= github.com/xtaci/kcp-go v5.4.4+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE= +github.com/xtaci/kcp-go v5.4.20+incompatible h1:TN1uey3Raw0sTz0Fg8GkfM0uH3YwzhnZWQ1bABv5xAg= +github.com/xtaci/kcp-go v5.4.20+incompatible/go.mod h1:bN6vIwHQbfHaHtFpEssmWsN45a+AZwO7eyRCmEIbtvE= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae h1:J0GxkO96kL4WF+AIT3M4mfUVinOCPgf2uUWYFUzN0sM= github.com/xtaci/lossyconn v0.0.0-20190602105132-8df528c0c9ae/go.mod h1:gXtu8J62kEgmN++bm9BVICuT/e8yiLI2KFobd/TRFsE= golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85 h1:et7+NAX3lLIk5qUCTA9QelBjGE/NkhzYw/mhnr0s7nI= golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20191219195013-becbf705a915/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= +golang.org/x/crypto v0.0.0-20200117160349-530e935923ad h1:Jh8cai0fqIK+f6nG0UgPW5wFk8wmiMhM3AyciDBdtQg= +golang.org/x/crypto v0.0.0-20200117160349-530e935923ad/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8 h1:idBdZTd9UioThJp8KpM/rTSinK/ChZFBE43/WtIy8zg= golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/image v0.0.0-20190227222117-0694c2d4d067/go.mod h1:kZ7UVZpmo3dzQBMxlp+ypCbDeSB+sBbTgSJuh5dn5js= @@ -117,13 +127,16 @@ golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028 h1:4+4C/Iv2U4fMZBiMCc98MG golang.org/x/mobile v0.0.0-20190719004257-d2bd2a29d028/go.mod h1:E/iHnbuqvinMTCcRqshq8CkpyQDoeVncDDYHnLhea+o= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a h1:gOpx8G595UYyvj8UK4+OFyY4rx037g3fmfhe5SasG3U= golang.org/x/net v0.0.0-20181114220301-adae6a3d119a/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20181220203305-927f97764cc3 h1:eH6Eip3UpmR+yM/qI9Ijluzb1bNv/cAU/n+6l8tRSis= -golang.org/x/net v0.0.0-20181220203305-927f97764cc3/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA= +golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= golang.org/x/sys v0.0.0-20190204203706-41f3e6584952/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190312061237-fead79001313/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190626221950-04f50cda93cb/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa h1:KIDDMLT1O0Nr7TSxp8xM5tJcdn8tgyAONntO829og1M= -golang.org/x/sys v0.0.0-20190804053845-51ab0e2deafa/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c h1:gUYreENmqtjZb2brVfUas1sC6UivSY8XwKwPo8tloLs= +golang.org/x/sys v0.0.0-20200117145432-59e60aa80a0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= diff --git a/gui/npc/AndroidManifest.xml b/gui/npc/AndroidManifest.xml index 6543fbd..550151f 100755 --- a/gui/npc/AndroidManifest.xml +++ b/gui/npc/AndroidManifest.xml @@ -2,7 +2,7 @@ diff --git a/lib/common/const.go b/lib/common/const.go index 2f195f3..ffb2fa6 100644 --- a/lib/common/const.go +++ b/lib/common/const.go @@ -36,19 +36,3 @@ WWW-Authenticate: Basic realm="easyProxy" ` ) - -const ( - MUX_PING_FLAG uint8 = iota - MUX_NEW_CONN_OK - MUX_NEW_CONN_Fail - MUX_NEW_MSG - MUX_NEW_MSG_PART - MUX_MSG_SEND_OK - MUX_NEW_CONN - MUX_CONN_CLOSE - MUX_PING_RETURN - MUX_PING int32 = -1 - MAXIMUM_SEGMENT_SIZE = PoolSizeWindow - MAXIMUM_WINDOW_SIZE = 1 << 27 // 1<<31-1 TCP slide window size is very large, - // we use 128M, reduce memory usage -) diff --git a/lib/common/netpackager.go b/lib/common/netpackager.go index 45456c5..b54121f 100644 --- a/lib/common/netpackager.go +++ b/lib/common/netpackager.go @@ -3,13 +3,11 @@ package common import ( "bytes" "encoding/binary" - "encoding/json" "errors" "io" "io/ioutil" "net" "strconv" - "strings" ) type NetPackager interface { @@ -17,222 +15,6 @@ type NetPackager interface { UnPack(reader io.Reader) (err error) } -type BasePackager struct { - Length uint16 - Content []byte -} - -func (Self *BasePackager) NewPac(contents ...interface{}) (err error) { - Self.clean() - for _, content := range contents { - switch content.(type) { - case nil: - Self.Content = Self.Content[:0] - case []byte: - err = Self.appendByte(content.([]byte)) - case string: - err = Self.appendByte([]byte(content.(string))) - if err != nil { - return - } - err = Self.appendByte([]byte(CONN_DATA_SEQ)) - default: - err = Self.marshal(content) - } - } - Self.setLength() - if Self.Length > MAXIMUM_SEGMENT_SIZE { - err = errors.New("mux:packer: newpack content segment too large") - } - return -} - -func (Self *BasePackager) appendByte(data []byte) (err error) { - m := len(Self.Content) - n := m + len(data) - if n <= cap(Self.Content) { - Self.Content = Self.Content[0:n] // grow the length for copy - copy(Self.Content[m:n], data) - return nil - } else { - return errors.New("pack content too large") - } -} - -//似乎这里涉及到父类作用域问题,当子类调用父类的方法时,其struct仅仅为父类的 -func (Self *BasePackager) Pack(writer io.Writer) (err error) { - err = binary.Write(writer, binary.LittleEndian, Self.Length) - if err != nil { - return - } - err = binary.Write(writer, binary.LittleEndian, Self.Content) - return -} - -//Unpack 会导致传入的数字类型转化成float64!! -//主要原因是json unmarshal并未传入正确的数据类型 -func (Self *BasePackager) UnPack(reader io.Reader) (n uint16, err error) { - Self.clean() - n += 2 // uint16 - err = binary.Read(reader, binary.LittleEndian, &Self.Length) - if err != nil { - return - } - if int(Self.Length) > cap(Self.Content) { - err = errors.New("unpack err, content length too large") - return - } - if Self.Length > MAXIMUM_SEGMENT_SIZE { - err = errors.New("mux:packer: unpack content segment too large") - return - } - Self.Content = Self.Content[:int(Self.Length)] - //n, err := io.ReadFull(reader, Self.Content) - //if n != int(Self.Length) { - // err = io.ErrUnexpectedEOF - //} - err = binary.Read(reader, binary.LittleEndian, Self.Content) - n += Self.Length - return -} - -func (Self *BasePackager) marshal(content interface{}) (err error) { - tmp, err := json.Marshal(content) - if err != nil { - return err - } - err = Self.appendByte(tmp) - return -} - -func (Self *BasePackager) Unmarshal(content interface{}) (err error) { - err = json.Unmarshal(Self.Content, content) - if err != nil { - return err - } - return -} - -func (Self *BasePackager) setLength() { - Self.Length = uint16(len(Self.Content)) - return -} - -func (Self *BasePackager) clean() { - Self.Length = 0 - Self.Content = Self.Content[:0] // reset length -} - -func (Self *BasePackager) Split() (strList []string) { - n := bytes.IndexByte(Self.Content, 0) - strList = strings.Split(string(Self.Content[:n]), CONN_DATA_SEQ) - strList = strList[0 : len(strList)-1] - return -} - -type ConnPackager struct { - // Todo - ConnType uint8 - BasePackager -} - -func (Self *ConnPackager) NewPac(connType uint8, content ...interface{}) (err error) { - Self.ConnType = connType - err = Self.BasePackager.NewPac(content...) - return -} - -func (Self *ConnPackager) Pack(writer io.Writer) (err error) { - err = binary.Write(writer, binary.LittleEndian, Self.ConnType) - if err != nil { - return - } - err = Self.BasePackager.Pack(writer) - return -} - -func (Self *ConnPackager) UnPack(reader io.Reader) (n uint16, err error) { - err = binary.Read(reader, binary.LittleEndian, &Self.ConnType) - if err != nil && err != io.EOF { - return - } - n, err = Self.BasePackager.UnPack(reader) - n += 2 - return -} - -type MuxPackager struct { - Flag uint8 - Id int32 - Window uint64 - BasePackager -} - -func (Self *MuxPackager) NewPac(flag uint8, id int32, content ...interface{}) (err error) { - Self.Flag = flag - Self.Id = id - switch flag { - case MUX_PING_FLAG, MUX_PING_RETURN, MUX_NEW_MSG, MUX_NEW_MSG_PART: - Self.Content = WindowBuff.Get() - err = Self.BasePackager.NewPac(content...) - //logs.Warn(Self.Length, string(Self.Content)) - case MUX_MSG_SEND_OK: - // MUX_MSG_SEND_OK contains one data - Self.Window = content[0].(uint64) - } - return -} - -func (Self *MuxPackager) Pack(writer io.Writer) (err error) { - err = binary.Write(writer, binary.LittleEndian, Self.Flag) - if err != nil { - return - } - err = binary.Write(writer, binary.LittleEndian, Self.Id) - if err != nil { - return - } - switch Self.Flag { - case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: - err = Self.BasePackager.Pack(writer) - WindowBuff.Put(Self.Content) - case MUX_MSG_SEND_OK: - err = binary.Write(writer, binary.LittleEndian, Self.Window) - } - return -} - -func (Self *MuxPackager) UnPack(reader io.Reader) (n uint16, err error) { - err = binary.Read(reader, binary.LittleEndian, &Self.Flag) - if err != nil { - return - } - err = binary.Read(reader, binary.LittleEndian, &Self.Id) - if err != nil { - return - } - switch Self.Flag { - case MUX_NEW_MSG, MUX_NEW_MSG_PART, MUX_PING_FLAG, MUX_PING_RETURN: - Self.Content = WindowBuff.Get() // need get a window buf from pool - Self.BasePackager.clean() // also clean the content - n, err = Self.BasePackager.UnPack(reader) - //logs.Warn("unpack", Self.Length, string(Self.Content)) - case MUX_MSG_SEND_OK: - err = binary.Read(reader, binary.LittleEndian, &Self.Window) - n += 8 // uint64 - } - n += 5 //uint8 int32 - return -} - -func (Self *MuxPackager) reset() { - Self.Id = 0 - Self.Flag = 0 - Self.Length = 0 - Self.Content = nil - Self.Window = 0 -} - const ( ipV4 = 1 domainName = 3 diff --git a/lib/common/pool.go b/lib/common/pool.go index 31931f9..8a487aa 100644 --- a/lib/common/pool.go +++ b/lib/common/pool.go @@ -1,7 +1,6 @@ package common import ( - "bytes" "sync" ) @@ -9,8 +8,6 @@ const PoolSize = 64 * 1024 const PoolSizeSmall = 100 const PoolSizeUdp = 1472 + 200 const PoolSizeCopy = 32 << 10 -const PoolSizeBuffer = 4096 -const PoolSizeWindow = PoolSizeBuffer - 2 - 4 - 4 - 1 var BufPool = sync.Pool{ New: func() interface{} { @@ -86,115 +83,11 @@ func (Self *copyBufferPool) Put(x []byte) { } } -type windowBufferPool struct { - pool sync.Pool -} - -func (Self *windowBufferPool) New() { - Self.pool = sync.Pool{ - New: func() interface{} { - return make([]byte, PoolSizeWindow) - }, - } -} - -func (Self *windowBufferPool) Get() (buf []byte) { - buf = Self.pool.Get().([]byte) - buf = buf[:PoolSizeWindow] - return buf -} - -func (Self *windowBufferPool) Put(x []byte) { - x = x[:0] // clean buf - Self.pool.Put(x) -} - -type bufferPool struct { - pool sync.Pool -} - -func (Self *bufferPool) New() { - Self.pool = sync.Pool{ - New: func() interface{} { - return bytes.NewBuffer(make([]byte, 0, PoolSizeBuffer)) - }, - } -} - -func (Self *bufferPool) Get() *bytes.Buffer { - return Self.pool.Get().(*bytes.Buffer) -} - -func (Self *bufferPool) Put(x *bytes.Buffer) { - x.Reset() - Self.pool.Put(x) -} - -type muxPackagerPool struct { - pool sync.Pool -} - -func (Self *muxPackagerPool) New() { - Self.pool = sync.Pool{ - New: func() interface{} { - pack := MuxPackager{} - return &pack - }, - } -} - -func (Self *muxPackagerPool) Get() *MuxPackager { - return Self.pool.Get().(*MuxPackager) -} - -func (Self *muxPackagerPool) Put(pack *MuxPackager) { - pack.reset() - Self.pool.Put(pack) -} - -type ListElement struct { - Buf []byte - L uint16 - Part bool -} - -type listElementPool struct { - pool sync.Pool -} - -func (Self *listElementPool) New() { - Self.pool = sync.Pool{ - New: func() interface{} { - element := ListElement{} - return &element - }, - } -} - -func (Self *listElementPool) Get() *ListElement { - return Self.pool.Get().(*ListElement) -} - -func (Self *listElementPool) Put(element *ListElement) { - element.L = 0 - element.Buf = nil - element.Part = false - Self.pool.Put(element) -} - var once = sync.Once{} -var BuffPool = bufferPool{} var CopyBuff = copyBufferPool{} -var MuxPack = muxPackagerPool{} -var WindowBuff = windowBufferPool{} -var ListElementPool = listElementPool{} func newPool() { - BuffPool.New() CopyBuff.New() - MuxPack.New() - WindowBuff.New() - ListElementPool.New() } func init() { diff --git a/lib/common/util.go b/lib/common/util.go index 71e604e..dce3c8b 100755 --- a/lib/common/util.go +++ b/lib/common/util.go @@ -98,7 +98,7 @@ func Getverifyval(vkey string) string { } //Change headers and host of request -func ChangeHostAndHeader(r *http.Request, host string, header string, addr string) { +func ChangeHostAndHeader(r *http.Request, host string, header string, addr string,addOrigin bool) { if host != "" { r.Host = host } @@ -115,8 +115,10 @@ func ChangeHostAndHeader(r *http.Request, host string, header string, addr strin if prior, ok := r.Header["X-Forwarded-For"]; ok { addr = strings.Join(prior, ", ") + ", " + addr } - r.Header.Set("X-Forwarded-For", addr) - r.Header.Set("X-Real-IP", addr) + if addOrigin { + r.Header.Set("X-Forwarded-For", addr) + r.Header.Set("X-Real-IP", addr) + } } //Read file content by file path diff --git a/lib/conn/conn.go b/lib/conn/conn.go index c1ca1b1..cf29acb 100755 --- a/lib/conn/conn.go +++ b/lib/conn/conn.go @@ -19,7 +19,7 @@ import ( "ehang.io/nps/lib/common" "ehang.io/nps/lib/crypt" "ehang.io/nps/lib/file" - "ehang.io/nps/lib/mux" + "ehang.io/nps/lib/pmux" "ehang.io/nps/lib/rate" "github.com/xtaci/kcp-go" ) @@ -126,8 +126,8 @@ func (s *Conn) SetAlive(tp string) { conn.SetReadDeadline(time.Time{}) //conn.SetKeepAlive(false) //conn.SetKeepAlivePeriod(time.Duration(2 * time.Second)) - case *mux.PortConn: - s.Conn.(*mux.PortConn).SetReadDeadline(time.Time{}) + case *pmux.PortConn: + s.Conn.(*pmux.PortConn).SetReadDeadline(time.Time{}) } } @@ -138,8 +138,8 @@ func (s *Conn) SetReadDeadlineBySecond(t time.Duration) { s.Conn.(*kcp.UDPSession).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second)) case *net.TCPConn: s.Conn.(*net.TCPConn).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second)) - case *mux.PortConn: - s.Conn.(*mux.PortConn).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second)) + case *pmux.PortConn: + s.Conn.(*pmux.PortConn).SetReadDeadline(time.Now().Add(time.Duration(t) * time.Second)) } } diff --git a/lib/file/db.go b/lib/file/db.go index 50be394..8552d5e 100644 --- a/lib/file/db.go +++ b/lib/file/db.go @@ -4,7 +4,6 @@ import ( "errors" "fmt" "net/http" - "regexp" "sort" "strings" "sync" @@ -328,13 +327,16 @@ func (s *DbUtils) GetInfoByHost(host string, r *http.Request) (h *Host, err erro } //Remove http(s) http(s)://a.proxy.com //*.proxy.com *.a.proxy.com Do some pan-parsing - tmp := strings.Replace(v.Host, "*", `\w+?`, -1) - var re *regexp.Regexp - if re, err = regexp.Compile(tmp); err != nil { + if v.Scheme != "all" && v.Scheme != r.URL.Scheme { return true } - if len(re.FindAllString(host, -1)) > 0 && (v.Scheme == "all" || v.Scheme == r.URL.Scheme) { - //URL routing + tmpHost := v.Host + if strings.Contains(tmpHost, "*") { + tmpHost = strings.Replace(tmpHost, "*", "", -1) + if strings.Contains(host, tmpHost) { + hosts = append(hosts, v) + } + } else if v.Host == host { hosts = append(hosts, v) } return true diff --git a/lib/mux/bytes.go b/lib/mux/bytes.go deleted file mode 100644 index c44bad4..0000000 --- a/lib/mux/bytes.go +++ /dev/null @@ -1,32 +0,0 @@ -package mux - -import ( - "bytes" - "encoding/binary" - "io" -) - -//write bytes with int32 length -func WriteLenBytes(buf []byte, w io.Writer) (int, error) { - raw := bytes.NewBuffer([]byte{}) - if err := binary.Write(raw, binary.LittleEndian, int32(len(buf))); err != nil { - return 0, err - } - if err := binary.Write(raw, binary.LittleEndian, buf); err != nil { - return 0, err - } - return w.Write(raw.Bytes()) -} - -//read bytes by length -func ReadLenBytes(buf []byte, r io.Reader) (int, error) { - var l uint32 - var err error - if binary.Read(r, binary.LittleEndian, &l) != nil { - return 0, err - } - if _, err = io.ReadFull(r, buf[:l]); err != nil { - return 0, err - } - return int(l), nil -} diff --git a/lib/mux/conn.go b/lib/mux/conn.go deleted file mode 100644 index 592de29..0000000 --- a/lib/mux/conn.go +++ /dev/null @@ -1,722 +0,0 @@ -package mux - -import ( - "ehang.io/nps/lib/common" - "errors" - "github.com/astaxie/beego/logs" - "io" - "math" - "net" - "runtime" - "sync" - "sync/atomic" - "time" -) - -type conn struct { - net.Conn - getStatusCh chan struct{} - connStatusOkCh chan struct{} - connStatusFailCh chan struct{} - connId int32 - isClose bool - closeFlag bool // close conn flag - receiveWindow *ReceiveWindow - sendWindow *SendWindow - once sync.Once - //label string -} - -func NewConn(connId int32, mux *Mux, label ...string) *conn { - c := &conn{ - getStatusCh: make(chan struct{}), - connStatusOkCh: make(chan struct{}), - connStatusFailCh: make(chan struct{}), - connId: connId, - receiveWindow: new(ReceiveWindow), - sendWindow: new(SendWindow), - once: sync.Once{}, - } - //if len(label) > 0 { - // c.label = label[0] - //} - c.receiveWindow.New(mux) - c.sendWindow.New(mux) - //logm := &connLog{ - // startTime: time.Now(), - // isClose: false, - // logs: []string{c.label + "new conn success"}, - //} - //setM(label[0], int(connId), logm) - return c -} - -func (s *conn) Read(buf []byte) (n int, err error) { - if s.isClose || buf == nil { - return 0, errors.New("the conn has closed") - } - if len(buf) == 0 { - return 0, nil - } - // waiting for takeout from receive window finish or timeout - //now := time.Now() - n, err = s.receiveWindow.Read(buf, s.connId) - //t := time.Now().Sub(now) - //if t.Seconds() > 0.5 { - //logs.Warn("conn read long", n, t.Seconds()) - //} - //var errstr string - //if err == nil { - // errstr = "err:nil" - //} else { - // errstr = err.Error() - //} - //d := getM(s.label, int(s.connId)) - //d.logs = append(d.logs, s.label+"read "+strconv.Itoa(n)+" "+errstr+" "+string(buf[:100])) - //setM(s.label, int(s.connId), d) - return -} - -func (s *conn) Write(buf []byte) (n int, err error) { - if s.isClose { - return 0, errors.New("the conn has closed") - } - if s.closeFlag { - //s.Close() - return 0, errors.New("io: write on closed conn") - } - if len(buf) == 0 { - return 0, nil - } - //logs.Warn("write buf", len(buf)) - //now := time.Now() - n, err = s.sendWindow.WriteFull(buf, s.connId) - //t := time.Now().Sub(now) - //if t.Seconds() > 0.5 { - // logs.Warn("conn write long", n, t.Seconds()) - //} - return -} - -func (s *conn) Close() (err error) { - s.once.Do(s.closeProcess) - return -} - -func (s *conn) closeProcess() { - s.isClose = true - s.receiveWindow.mux.connMap.Delete(s.connId) - if !s.receiveWindow.mux.IsClose { - // if server or user close the conn while reading, will get a io.EOF - // and this Close method will be invoke, send this signal to close other side - s.receiveWindow.mux.sendInfo(common.MUX_CONN_CLOSE, s.connId, nil) - } - s.sendWindow.CloseWindow() - s.receiveWindow.CloseWindow() - //d := getM(s.label, int(s.connId)) - //d.isClose = true - //d.logs = append(d.logs, s.label+"close "+time.Now().String()) - //setM(s.label, int(s.connId), d) - return -} - -func (s *conn) LocalAddr() net.Addr { - return s.receiveWindow.mux.conn.LocalAddr() -} - -func (s *conn) RemoteAddr() net.Addr { - return s.receiveWindow.mux.conn.RemoteAddr() -} - -func (s *conn) SetDeadline(t time.Time) error { - _ = s.SetReadDeadline(t) - _ = s.SetWriteDeadline(t) - return nil -} - -func (s *conn) SetReadDeadline(t time.Time) error { - s.receiveWindow.SetTimeOut(t) - return nil -} - -func (s *conn) SetWriteDeadline(t time.Time) error { - s.sendWindow.SetTimeOut(t) - return nil -} - -type window struct { - maxSizeDone uint64 - // 64bit alignment - // maxSizeDone contains 4 parts - // 1 31 1 31 - // wait maxSize useless done - // wait zero means false, one means true - off uint32 - closeOp bool - closeOpCh chan struct{} - mux *Mux -} - -const windowBits = 31 -const waitBits = dequeueBits + windowBits -const mask1 = 1 -const mask31 = 1<> dequeueBits) & mask31) - done = uint32(ptrs & mask31) - //logs.Warn("unpack", maxSize, done) - if ((ptrs >> waitBits) & mask1) == 1 { - wait = true - return - } - return -} - -func (Self *window) pack(maxSize, done uint32, wait bool) uint64 { - //logs.Warn("pack", maxSize, done, wait) - if wait { - return (uint64(1)< 0 { - n = uint32(l) - } - return -} - -func (Self *ReceiveWindow) calcSize() { - // calculating maximum receive window size - if Self.count == 0 { - //logs.Warn("ping, bw", Self.mux.latency, Self.bw.Get()) - //conns := Self.mux.connMap.Size() - muxBw := Self.mux.bw.Get() - connBw := Self.bw.Get() - //logs.Warn("muxbw connbw", muxBw, connBw) - var n uint32 - if connBw > 0 && muxBw > 0 { - n = uint32(math.Float64frombits(atomic.LoadUint64(&Self.mux.latency)) * - (muxBw + connBw)) - } - //logs.Warn(n) - if n < common.MAXIMUM_SEGMENT_SIZE*30 { - //logs.Warn("window small", n, Self.mux.bw.Get(), Self.bw.Get()) - n = common.MAXIMUM_SEGMENT_SIZE * 30 - } - for { - ptrs := atomic.LoadUint64(&Self.maxSizeDone) - size, read, wait := Self.unpack(ptrs) - if n < size/2 { - n = size / 2 - // half reduce - } - // set the minimal size - if n > 2*size { - n = 2 * size - // twice grow - } - if connBw > 0 && muxBw > 0 { - limit := uint32(common.MAXIMUM_WINDOW_SIZE * (connBw / (muxBw + connBw))) - if n > limit { - logs.Warn("window too large, calculated:", n, "limit:", limit, connBw, muxBw) - n = limit - } - } - // set the maximum size - //logs.Warn("n", n) - if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(n, read, wait)) { - // only change the maxSize - break - } - } - Self.count = -10 - } - Self.count += 1 - return -} - -func (Self *ReceiveWindow) Write(buf []byte, l uint16, part bool, id int32) (err error) { - if Self.closeOp { - return errors.New("conn.receiveWindow: write on closed window") - } - element, err := NewListElement(buf, l, part) - //logs.Warn("push the buf", len(buf), l, element.L) - if err != nil { - return - } - Self.calcSize() // calculate the max window size - var wait bool - var maxSize, read uint32 -start: - ptrs := atomic.LoadUint64(&Self.maxSizeDone) - maxSize, read, wait = Self.unpack(ptrs) - remain := Self.remainingSize(maxSize, l) - // calculate the remaining window size now, plus the element we will push - if remain == 0 && !wait { - //logs.Warn("window full true", remaining) - wait = true - if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) { - // only change the wait status, not send the read size - goto start - // another goroutine change the status, make sure shall we need wait - } - //logs.Warn("receive window full") - } else if !wait { - if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, wait)) { - // reset read size here, and send the read size directly - goto start - // another goroutine change the status, make sure shall we need wait - } - } // maybe there are still some data received even if window is full, just keep the wait status - // and push into queue. when receive window read enough, send window will be acknowledged. - Self.bufQueue.Push(element) - // status check finish, now we can push the element into the queue - if !wait { - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false)) - // send the current status to send window - } - return nil -} - -func (Self *ReceiveWindow) Read(p []byte, id int32) (n int, err error) { - if Self.closeOp { - return 0, io.EOF // receive close signal, returns eof - } - Self.bw.StartRead() - n, err = Self.readFromQueue(p, id) - Self.bw.SetCopySize(uint16(n)) - return -} - -func (Self *ReceiveWindow) readFromQueue(p []byte, id int32) (n int, err error) { - pOff := 0 - l := 0 - //logs.Warn("receive window read off, element.l", Self.off, Self.element.L) -copyData: - if Self.off == uint32(Self.element.L) { - // on the first Read method invoked, Self.off and Self.element.l - // both zero value - common.ListElementPool.Put(Self.element) - if Self.closeOp { - return 0, io.EOF - } - Self.element, err = Self.bufQueue.Pop() - // if the queue is empty, Pop method will wait until one element push - // into the queue successful, or timeout. - // timer start on timeout parameter is set up - Self.off = 0 - if err != nil { - Self.CloseWindow() // also close the window, to avoid read twice - return // queue receive stop or time out, break the loop and return - } - //logs.Warn("pop element", Self.element.L, Self.element.Part) - } - l = copy(p[pOff:], Self.element.Buf[Self.off:Self.element.L]) - pOff += l - Self.off += uint32(l) - //logs.Warn("window read length buf len", Self.readLength, Self.bufQueue.Len()) - n += l - l = 0 - if Self.off == uint32(Self.element.L) { - //logs.Warn("put the element end ", string(Self.element.buf[:15])) - common.WindowBuff.Put(Self.element.Buf) - Self.sendStatus(id, Self.element.L) - // check the window full status - } - if pOff < len(p) && Self.element.Part { - // element is a part of the segments, trying to fill up buf p - goto copyData - } - return // buf p is full or all of segments in buf, return -} - -func (Self *ReceiveWindow) sendStatus(id int32, l uint16) { - var maxSize, read uint32 - var wait bool - for { - ptrs := atomic.LoadUint64(&Self.maxSizeDone) - maxSize, read, wait = Self.unpack(ptrs) - if read <= (read+uint32(l))&mask31 { - read += uint32(l) - remain := Self.remainingSize(maxSize, 0) - if wait && remain > 0 || read >= maxSize/2 || remain == maxSize { - if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, 0, false)) { - // now we get the current window status success - // receive window free up some space we need acknowledge send window, also reset the read size - // still having a condition that receive window is empty and not send the status to send window - // so send the status here - //logs.Warn("receive window free up some space", remain) - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false)) - break - } - } else { - if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, read, wait)) { - // receive window not into the wait status, or still not having any space now, - // just change the read size - break - } - } - } else { - //overflow - if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, uint32(l), wait)) { - // reset to l - Self.mux.sendInfo(common.MUX_MSG_SEND_OK, id, Self.pack(maxSize, read, false)) - break - } - } - runtime.Gosched() - // another goroutine change remaining or wait status, make sure - } - return -} - -func (Self *ReceiveWindow) SetTimeOut(t time.Time) { - // waiting for FIFO queue Pop method - Self.bufQueue.SetTimeOut(t) -} - -func (Self *ReceiveWindow) Stop() { - // queue has no more data to push, so unblock pop method - Self.once.Do(Self.bufQueue.Stop) -} - -func (Self *ReceiveWindow) CloseWindow() { - Self.window.CloseWindow() - Self.Stop() - Self.release() -} - -func (Self *ReceiveWindow) release() { - //if Self.element != nil { - // if Self.element.Buf != nil { - // common.WindowBuff.Put(Self.element.Buf) - // } - // common.ListElementPool.Put(Self.element) - //} - for { - ele := Self.bufQueue.TryPop() - if ele == nil { - return - } - if ele.Buf != nil { - common.WindowBuff.Put(ele.Buf) - } - common.ListElementPool.Put(ele) - } // release resource -} - -type SendWindow struct { - window - buf []byte - setSizeCh chan struct{} - timeout time.Time - // send window receive the receive window max size and read size - // done size store the size send window has send, send and read will be totally equal - // so send minus read, send window can get the current window size remaining -} - -func (Self *SendWindow) New(mux *Mux) { - Self.setSizeCh = make(chan struct{}) - Self.maxSizeDone = Self.pack(common.MAXIMUM_SEGMENT_SIZE*30, 0, false) - Self.mux = mux - Self.window.New() -} - -func (Self *SendWindow) SetSendBuf(buf []byte) { - // send window buff from conn write method, set it to send window - Self.buf = buf - Self.off = 0 -} - -func (Self *SendWindow) remainingSize(maxSize, send uint32) uint32 { - l := int64(maxSize&mask31) - int64(send&mask31) - if l > 0 { - return uint32(l) - } - return 0 -} - -func (Self *SendWindow) SetSize(currentMaxSizeDone uint64) (closed bool) { - // set the window size from receive window - defer func() { - if recover() != nil { - closed = true - } - }() - if Self.closeOp { - close(Self.setSizeCh) - return true - } - //logs.Warn("set send window size to ", windowSize, newRemaining) - var maxsize, send uint32 - var wait, newWait bool - currentMaxSize, read, _ := Self.unpack(currentMaxSizeDone) - for { - ptrs := atomic.LoadUint64(&Self.maxSizeDone) - maxsize, send, wait = Self.unpack(ptrs) - if read > send { - logs.Error("window read > send: max size:", currentMaxSize, "read:", read, "send", send) - return - } - if read == 0 && currentMaxSize == maxsize { - return - } - send -= read - remain := Self.remainingSize(currentMaxSize, send) - if remain == 0 && wait { - // just keep the wait status - newWait = true - } - // remain > 0, change wait to false. or remain == 0, wait is false, just keep it - if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(currentMaxSize, send, newWait)) { - break - } - // anther goroutine change wait status or window size - } - if wait && !newWait { - // send window into the wait status, need notice the channel - //logs.Warn("send window allow") - Self.allow() - } - // send window not into the wait status, so just do slide - return false -} - -func (Self *SendWindow) allow() { - select { - case Self.setSizeCh <- struct{}{}: - //logs.Warn("send window remaining size is 0 finish") - return - case <-Self.closeOpCh: - close(Self.setSizeCh) - return - } -} - -func (Self *SendWindow) sent(sentSize uint32) { - var maxSie, send uint32 - var wait bool - for { - ptrs := atomic.LoadUint64(&Self.maxSizeDone) - maxSie, send, wait = Self.unpack(ptrs) - if (send+sentSize)&mask31 < send { - // overflow - runtime.Gosched() - continue - } - if atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSie, send+sentSize, wait)) { - // set the send size - //logs.Warn("sent", maxSie, send+sentSize, wait) - break - } - } -} - -func (Self *SendWindow) WriteTo() (p []byte, sendSize uint32, part bool, err error) { - // returns buf segments, return only one segments, need a loop outside - // until err = io.EOF - if Self.closeOp { - return nil, 0, false, errors.New("conn.writeWindow: window closed") - } - if Self.off == uint32(len(Self.buf)) { - return nil, 0, false, io.EOF - // send window buff is drain, return eof and get another one - } - var maxSize, send uint32 -start: - ptrs := atomic.LoadUint64(&Self.maxSizeDone) - maxSize, send, _ = Self.unpack(ptrs) - remain := Self.remainingSize(maxSize, send) - if remain == 0 { - if !atomic.CompareAndSwapUint64(&Self.maxSizeDone, ptrs, Self.pack(maxSize, send, true)) { - // just change the status wait status - goto start // another goroutine change the window, try again - } - // into the wait status - //logs.Warn("send window into wait status") - err = Self.waitReceiveWindow() - if err != nil { - return nil, 0, false, err - } - //logs.Warn("rem into wait finish") - goto start - } - // there are still remaining window - //logs.Warn("rem", remain, maxSize, send) - if len(Self.buf[Self.off:]) > common.MAXIMUM_SEGMENT_SIZE { - sendSize = common.MAXIMUM_SEGMENT_SIZE - //logs.Warn("cut buf by mss") - } else { - sendSize = uint32(len(Self.buf[Self.off:])) - } - if remain < sendSize { - // usable window size is small than - // window MAXIMUM_SEGMENT_SIZE or send buf left - sendSize = remain - //logs.Warn("cut buf by remainingsize", sendSize, len(Self.buf[Self.off:])) - } - //logs.Warn("send size", sendSize) - if sendSize < uint32(len(Self.buf[Self.off:])) { - part = true - } - p = Self.buf[Self.off : sendSize+Self.off] - Self.off += sendSize - Self.sent(sendSize) - return -} - -func (Self *SendWindow) waitReceiveWindow() (err error) { - t := Self.timeout.Sub(time.Now()) - if t < 0 { // not set the timeout, wait for it as long as connection close - select { - case _, ok := <-Self.setSizeCh: - if !ok { - return errors.New("conn.writeWindow: window closed") - } - return nil - case <-Self.closeOpCh: - return errors.New("conn.writeWindow: window closed") - } - } - timer := time.NewTimer(t) - defer timer.Stop() - // waiting for receive usable window size, or timeout - select { - case _, ok := <-Self.setSizeCh: - if !ok { - return errors.New("conn.writeWindow: window closed") - } - return nil - case <-timer.C: - return errors.New("conn.writeWindow: write to time out") - case <-Self.closeOpCh: - return errors.New("conn.writeWindow: window closed") - } -} - -func (Self *SendWindow) WriteFull(buf []byte, id int32) (n int, err error) { - Self.SetSendBuf(buf) // set the buf to send window - //logs.Warn("set the buf to send window") - var bufSeg []byte - var part bool - var l uint32 - for { - bufSeg, l, part, err = Self.WriteTo() - //logs.Warn("buf seg", len(bufSeg), part, err) - // get the buf segments from send window - if bufSeg == nil && part == false && err == io.EOF { - // send window is drain, break the loop - err = nil - break - } - if err != nil { - break - } - n += int(l) - l = 0 - if part { - Self.mux.sendInfo(common.MUX_NEW_MSG_PART, id, bufSeg) - } else { - Self.mux.sendInfo(common.MUX_NEW_MSG, id, bufSeg) - //logs.Warn("buf seg sent", len(bufSeg), part, err) - } - // send to other side, not send nil data to other side - } - //logs.Warn("buf seg write success") - return -} - -func (Self *SendWindow) SetTimeOut(t time.Time) { - // waiting for receive a receive window size - Self.timeout = t -} - -type writeBandwidth struct { - writeBW uint64 // store in bits, but it's float64 - readEnd time.Time - duration float64 - bufLength uint32 -} - -const writeCalcThreshold uint32 = 5 * 1024 * 1024 - -func NewWriteBandwidth() *writeBandwidth { - return &writeBandwidth{} -} - -func (Self *writeBandwidth) StartRead() { - if Self.readEnd.IsZero() { - Self.readEnd = time.Now() - } - Self.duration += time.Now().Sub(Self.readEnd).Seconds() - if Self.bufLength >= writeCalcThreshold { - Self.calcBandWidth() - } -} - -func (Self *writeBandwidth) SetCopySize(n uint16) { - Self.bufLength += uint32(n) - Self.endRead() -} - -func (Self *writeBandwidth) endRead() { - Self.readEnd = time.Now() -} - -func (Self *writeBandwidth) calcBandWidth() { - atomic.StoreUint64(&Self.writeBW, math.Float64bits(float64(Self.bufLength)/Self.duration)) - Self.bufLength = 0 - Self.duration = 0 -} - -func (Self *writeBandwidth) Get() (bw float64) { - // The zero value, 0 for numeric types - bw = math.Float64frombits(atomic.LoadUint64(&Self.writeBW)) - if bw <= 0 { - bw = 0 - } - return -} diff --git a/lib/mux/map.go b/lib/mux/map.go deleted file mode 100644 index 86d09b5..0000000 --- a/lib/mux/map.go +++ /dev/null @@ -1,75 +0,0 @@ -package mux - -import ( - "sync" -) - -type connMap struct { - connMap map[int32]*conn - //closeCh chan struct{} - sync.RWMutex -} - -func NewConnMap() *connMap { - connMap := &connMap{ - connMap: make(map[int32]*conn), - //closeCh: make(chan struct{}), - } - //go connMap.clean() - return connMap -} - -func (s *connMap) Size() (n int) { - s.Lock() - n = len(s.connMap) - s.Unlock() - return -} - -func (s *connMap) Get(id int32) (*conn, bool) { - s.Lock() - v, ok := s.connMap[id] - s.Unlock() - if ok && v != nil { - return v, true - } - return nil, false -} - -func (s *connMap) Set(id int32, v *conn) { - s.Lock() - s.connMap[id] = v - s.Unlock() -} - -func (s *connMap) Close() { - //s.closeCh <- struct{}{} // stop the clean goroutine first - for _, v := range s.connMap { - v.Close() // close all the connections in the mux - } -} - -func (s *connMap) Delete(id int32) { - s.Lock() - delete(s.connMap, id) - s.Unlock() -} - -//func (s *connMap) clean() { -// ticker := time.NewTimer(time.Minute * 1) -// for { -// select { -// case <-ticker.C: -// s.Lock() -// for _, v := range s.connMap { -// if v.isClose { -// delete(s.connMap, v.connId) -// } -// } -// s.Unlock() -// case <-s.closeCh: -// ticker.Stop() -// return -// } -// } -//} diff --git a/lib/mux/mux.go b/lib/mux/mux.go deleted file mode 100644 index 5b591ed..0000000 --- a/lib/mux/mux.go +++ /dev/null @@ -1,535 +0,0 @@ -package mux - -import ( - "errors" - "io" - "math" - "net" - "os" - "sync/atomic" - "time" - - "ehang.io/nps/lib/common" - "github.com/astaxie/beego/logs" -) - -type Mux struct { - latency uint64 // we store latency in bits, but it's float64 - net.Listener - conn net.Conn - connMap *connMap - newConnCh chan *conn - id int32 - closeChan chan struct{} - IsClose bool - pingOk uint32 - counter *latencyCounter - bw *bandwidth - pingCh chan []byte - pingCheckTime uint32 - connType string - writeQueue PriorityQueue - newConnQueue ConnQueue -} - -func NewMux(c net.Conn, connType string) *Mux { - //c.(*net.TCPConn).SetReadBuffer(0) - //c.(*net.TCPConn).SetWriteBuffer(0) - _ = c.SetDeadline(time.Time{}) - fd, err := getConnFd(c) - if err != nil { - logs.Warn(err) - } - m := &Mux{ - conn: c, - connMap: NewConnMap(), - id: 0, - closeChan: make(chan struct{}, 1), - newConnCh: make(chan *conn), - bw: NewBandwidth(fd), - IsClose: false, - connType: connType, - pingCh: make(chan []byte), - counter: newLatencyCounter(), - } - m.writeQueue.New() - m.newConnQueue.New() - //read session by flag - m.readSession() - //ping - m.ping() - m.pingReturn() - m.writeSession() - return m -} - -func (s *Mux) NewConn() (*conn, error) { - if s.IsClose { - return nil, errors.New("the mux has closed") - } - conn := NewConn(s.getId(), s, "nps ") - //it must be set before send - s.connMap.Set(conn.connId, conn) - s.sendInfo(common.MUX_NEW_CONN, conn.connId, nil) - //set a timer timeout 30 second - timer := time.NewTimer(time.Minute * 2) - defer timer.Stop() - select { - case <-conn.connStatusOkCh: - return conn, nil - case <-conn.connStatusFailCh: - case <-timer.C: - } - return nil, errors.New("create connection fail,the server refused the connection") -} - -func (s *Mux) Accept() (net.Conn, error) { - if s.IsClose { - return nil, errors.New("accpet error,the mux has closed") - } - conn := <-s.newConnCh - if conn == nil { - return nil, errors.New("accpet error,the conn has closed") - } - return conn, nil -} - -func (s *Mux) Addr() net.Addr { - return s.conn.LocalAddr() -} - -func (s *Mux) sendInfo(flag uint8, id int32, data ...interface{}) { - if s.IsClose { - return - } - var err error - pack := common.MuxPack.Get() - err = pack.NewPac(flag, id, data...) - if err != nil { - common.MuxPack.Put(pack) - logs.Error("mux: new pack err", err) - s.Close() - return - } - s.writeQueue.Push(pack) - return -} - -func (s *Mux) writeSession() { - go s.packBuf() - //go s.writeBuf() -} - -func (s *Mux) packBuf() { - //buffer := common.BuffPool.Get() - for { - if s.IsClose { - break - } - //buffer.Reset() - pack := s.writeQueue.Pop() - if s.IsClose { - break - } - //buffer := common.BuffPool.Get() - err := pack.Pack(s.conn) - common.MuxPack.Put(pack) - if err != nil { - logs.Error("mux: pack err", err) - //common.BuffPool.Put(buffer) - s.Close() - break - } - //logs.Warn(buffer.String()) - //s.bufQueue.Push(buffer) - //l := buffer.Len() - //n, err := buffer.WriteTo(s.conn) - //common.BuffPool.Put(buffer) - //if err != nil || int(n) != l { - // logs.Error("mux: close from write session fail ", err, n, l) - // s.Close() - // break - //} - } -} - -//func (s *Mux) writeBuf() { -// for { -// if s.IsClose { -// break -// } -// buffer, err := s.bufQueue.Pop() -// if err != nil { -// break -// } -// l := buffer.Len() -// n, err := buffer.WriteTo(s.conn) -// common.BuffPool.Put(buffer) -// if err != nil || int(n) != l { -// logs.Warn("close from write session fail ", err, n, l) -// s.Close() -// break -// } -// } -//} - -func (s *Mux) ping() { - go func() { - now, _ := time.Now().UTC().MarshalText() - s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) - // send the ping flag and get the latency first - ticker := time.NewTicker(time.Second * 5) - defer ticker.Stop() - for { - if s.IsClose { - break - } - select { - case <-ticker.C: - } - if atomic.LoadUint32(&s.pingCheckTime) >= 60 { - logs.Error("mux: ping time out") - s.Close() - // more than 5 minutes not receive the ping return package, - // mux conn is damaged, maybe a packet drop, close it - break - } - now, _ := time.Now().UTC().MarshalText() - s.sendInfo(common.MUX_PING_FLAG, common.MUX_PING, now) - atomic.AddUint32(&s.pingCheckTime, 1) - if atomic.LoadUint32(&s.pingOk) > 10 && s.connType == "kcp" { - logs.Error("mux: kcp ping err") - s.Close() - break - } - atomic.AddUint32(&s.pingOk, 1) - } - return - }() -} - -func (s *Mux) pingReturn() { - go func() { - var now time.Time - var data []byte - for { - if s.IsClose { - break - } - select { - case data = <-s.pingCh: - atomic.StoreUint32(&s.pingCheckTime, 0) - case <-s.closeChan: - return - } - _ = now.UnmarshalText(data) - latency := time.Now().UTC().Sub(now).Seconds() / 2 - if latency > 0 { - atomic.StoreUint64(&s.latency, math.Float64bits(s.counter.Latency(latency))) - // convert float64 to bits, store it atomic - } - //logs.Warn("latency", math.Float64frombits(atomic.LoadUint64(&s.latency))) - if cap(data) > 0 { - common.WindowBuff.Put(data) - } - } - }() -} - -func (s *Mux) readSession() { - go func() { - var connection *conn - for { - if s.IsClose { - break - } - connection = s.newConnQueue.Pop() - if s.IsClose { - break // make sure that is closed - } - s.connMap.Set(connection.connId, connection) //it has been set before send ok - s.newConnCh <- connection - s.sendInfo(common.MUX_NEW_CONN_OK, connection.connId, nil) - } - }() - go func() { - pack := common.MuxPack.Get() - var l uint16 - var err error - for { - if s.IsClose { - break - } - pack = common.MuxPack.Get() - s.bw.StartRead() - if l, err = pack.UnPack(s.conn); err != nil { - logs.Error("mux: read session unpack from connection err", err) - s.Close() - break - } - s.bw.SetCopySize(l) - atomic.StoreUint32(&s.pingOk, 0) - switch pack.Flag { - case common.MUX_NEW_CONN: //new connection - connection := NewConn(pack.Id, s) - s.newConnQueue.Push(connection) - continue - case common.MUX_PING_FLAG: //ping - s.sendInfo(common.MUX_PING_RETURN, common.MUX_PING, pack.Content) - common.WindowBuff.Put(pack.Content) - continue - case common.MUX_PING_RETURN: - //go func(content []byte) { - s.pingCh <- pack.Content - //}(pack.Content) - continue - } - if connection, ok := s.connMap.Get(pack.Id); ok && !connection.isClose { - switch pack.Flag { - case common.MUX_NEW_MSG, common.MUX_NEW_MSG_PART: //new msg from remote connection - err = s.newMsg(connection, pack) - if err != nil { - logs.Error("mux: read session connection new msg err", err) - connection.Close() - } - continue - case common.MUX_NEW_CONN_OK: //connection ok - connection.connStatusOkCh <- struct{}{} - continue - case common.MUX_NEW_CONN_Fail: - connection.connStatusFailCh <- struct{}{} - continue - case common.MUX_MSG_SEND_OK: - if connection.isClose { - continue - } - connection.sendWindow.SetSize(pack.Window) - continue - case common.MUX_CONN_CLOSE: //close the connection - connection.closeFlag = true - //s.connMap.Delete(pack.Id) - //go func(connection *conn) { - connection.receiveWindow.Stop() // close signal to receive window - //}(connection) - continue - } - } else if pack.Flag == common.MUX_CONN_CLOSE { - continue - } - common.MuxPack.Put(pack) - } - common.MuxPack.Put(pack) - s.Close() - }() -} - -func (s *Mux) newMsg(connection *conn, pack *common.MuxPackager) (err error) { - if connection.isClose { - err = io.ErrClosedPipe - return - } - //logs.Warn("read session receive new msg", pack.Length) - //go func(connection *conn, pack *common.MuxPackager) { // do not block read session - //insert into queue - if pack.Flag == common.MUX_NEW_MSG_PART { - err = connection.receiveWindow.Write(pack.Content, pack.Length, true, pack.Id) - } - if pack.Flag == common.MUX_NEW_MSG { - err = connection.receiveWindow.Write(pack.Content, pack.Length, false, pack.Id) - } - //logs.Warn("read session write success", pack.Length) - return -} - -func (s *Mux) Close() (err error) { - logs.Warn("close mux") - if s.IsClose { - return errors.New("the mux has closed") - } - s.IsClose = true - s.connMap.Close() - s.connMap = nil - //s.bufQueue.Stop() - s.closeChan <- struct{}{} - close(s.newConnCh) - err = s.conn.Close() - s.release() - return -} - -func (s *Mux) release() { - for { - pack := s.writeQueue.TryPop() - if pack == nil { - break - } - if pack.BasePackager.Content != nil { - common.WindowBuff.Put(pack.BasePackager.Content) - } - common.MuxPack.Put(pack) - } - for { - connection := s.newConnQueue.TryPop() - if connection == nil { - break - } - connection = nil - } - s.writeQueue.Stop() - s.newConnQueue.Stop() -} - -//get new connId as unique flag -func (s *Mux) getId() (id int32) { - //Avoid going beyond the scope - if (math.MaxInt32 - s.id) < 10000 { - atomic.StoreInt32(&s.id, 0) - } - id = atomic.AddInt32(&s.id, 1) - if _, ok := s.connMap.Get(id); ok { - return s.getId() - } - return -} - -type bandwidth struct { - readBandwidth uint64 // store in bits, but it's float64 - readStart time.Time - lastReadStart time.Time - bufLength uint32 - fd *os.File - calcThreshold uint32 -} - -func NewBandwidth(fd *os.File) *bandwidth { - return &bandwidth{fd: fd} -} - -func (Self *bandwidth) StartRead() { - if Self.readStart.IsZero() { - Self.readStart = time.Now() - } - if Self.bufLength >= Self.calcThreshold { - Self.lastReadStart, Self.readStart = Self.readStart, time.Now() - Self.calcBandWidth() - } -} - -func (Self *bandwidth) SetCopySize(n uint16) { - Self.bufLength += uint32(n) -} - -func (Self *bandwidth) calcBandWidth() { - t := Self.readStart.Sub(Self.lastReadStart) - bufferSize, err := sysGetSock(Self.fd) - //logs.Warn(bufferSize) - if err != nil { - logs.Warn(err) - Self.bufLength = 0 - return - } - if Self.bufLength >= uint32(bufferSize) { - atomic.StoreUint64(&Self.readBandwidth, math.Float64bits(float64(Self.bufLength)/t.Seconds())) - // calculate the whole socket buffer, the time meaning to fill the buffer - //logs.Warn(Self.Get()) - } else { - Self.calcThreshold = uint32(bufferSize) - } - // socket buffer size is bigger than bufLength, so we don't calculate it - Self.bufLength = 0 -} - -func (Self *bandwidth) Get() (bw float64) { - // The zero value, 0 for numeric types - bw = math.Float64frombits(atomic.LoadUint64(&Self.readBandwidth)) - if bw <= 0 { - bw = 0 - } - //logs.Warn(bw) - return -} - -const counterBits = 4 -const counterMask = 1<> counterBits) & counterMask) - // we set head is 4 bits - min = uint8(idxs & counterMask) - return -} - -func (Self *latencyCounter) pack(head, min uint8) uint8 { - return uint8(head< value { - min = head - } - head++ - Self.headMin = Self.pack(head, min) -} - -func (Self *latencyCounter) minimal() (min uint8) { - var val float64 - var i uint8 - for i = 0; i < counterMask; i++ { - if Self.buf[i] > 0 { - if val > Self.buf[i] { - val = Self.buf[i] - min = i - } - } - } - return -} - -func (Self *latencyCounter) Latency(value float64) (latency float64) { - Self.add(value) - _, min := Self.unpack(Self.headMin) - latency = Self.buf[min] * Self.countSuccess() - return -} - -const lossRatio = 1.6 - -func (Self *latencyCounter) countSuccess() (successRate float64) { - var success, loss, i uint8 - _, min := Self.unpack(Self.headMin) - for i = 0; i < counterMask; i++ { - if Self.buf[i] > lossRatio*Self.buf[min] && Self.buf[i] > 0 { - loss++ - } - if Self.buf[i] <= lossRatio*Self.buf[min] && Self.buf[i] > 0 { - success++ - } - } - // counting all the data in the ring buf, except zero - successRate = float64(success) / float64(loss+success) - return -} diff --git a/lib/mux/mux_test.go b/lib/mux/mux_test.go deleted file mode 100644 index a3ed3fc..0000000 --- a/lib/mux/mux_test.go +++ /dev/null @@ -1,453 +0,0 @@ -package mux - -import ( - "bufio" - "ehang.io/nps/lib/common" - "ehang.io/nps/lib/goroutine" - "fmt" - "io" - "log" - "net" - "net/http" - "net/http/httputil" - _ "net/http/pprof" - "strconv" - "testing" - "time" - "unsafe" - - "github.com/astaxie/beego/logs" -) - -var conn1 net.Conn -var conn2 net.Conn - -func TestNewMux(t *testing.T) { - go func() { - http.ListenAndServe("0.0.0.0:8889", nil) - }() - logs.EnableFuncCallDepth(true) - logs.SetLogFuncCallDepth(3) - server() - client() - //poolConnCopy, _ := ants.NewPoolWithFunc(200000, common.copyConn, ants.WithNonblocking(false)) - time.Sleep(time.Second * 3) - go func() { - m2 := NewMux(conn2, "tcp") - //m2 := NewMux(conn2, "kcp") - for { - //logs.Warn("npc starting accept") - c, err := m2.Accept() - if err != nil { - logs.Warn(err) - continue - } - //logs.Warn("npc accept success ") - c2, err := net.Dial("tcp", "127.0.0.1:80") - if err != nil { - logs.Warn(err) - c.Close() - continue - } - //c2.(*net.TCPConn).SetReadBuffer(0) - //c2.(*net.TCPConn).SetReadBuffer(0) - _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(c, c2, nil)) - //go func(c2 net.Conn, c *conn) { - // wg := new(sync.WaitGroup) - // wg.Add(2) - // _ = poolConnCopy.Invoke(common.newConnGroup(c2, c, wg)) - // //go func() { - // // _, err = common.CopyBuffer(c2, c) - // // if err != nil { - // // c2.Close() - // // c.Close() - // // //logs.Warn("close npc by copy from nps", err, c.connId) - // // } - // // wg.Done() - // //}() - // //wg.Add(1) - // _ = poolConnCopy.Invoke(common.newConnGroup(c, c2, wg)) - // //go func() { - // // _, err = common.CopyBuffer(c, c2) - // // if err != nil { - // // c2.Close() - // // c.Close() - // // //logs.Warn("close npc by copy from server", err, c.connId) - // // } - // // wg.Done() - // //}() - // //logs.Warn("npc wait") - // wg.Wait() - //}(c2, c.(*conn)) - } - }() - - go func() { - m1 := NewMux(conn1, "tcp") - //m1 := NewMux(conn1, "kcp") - l, err := net.Listen("tcp", "127.0.0.1:7777") - if err != nil { - logs.Warn(err) - } - for { - //logs.Warn("nps starting accept") - conns, err := l.Accept() - if err != nil { - logs.Warn(err) - continue - } - //conns.(*net.TCPConn).SetReadBuffer(0) - //conns.(*net.TCPConn).SetReadBuffer(0) - //logs.Warn("nps accept success starting new conn") - tmpCpnn, err := m1.NewConn() - if err != nil { - logs.Warn("nps new conn err ", err) - continue - } - //logs.Warn("nps new conn success ", tmpCpnn.connId) - _ = goroutine.CopyConnsPool.Invoke(goroutine.NewConns(tmpCpnn, conns, nil)) - //go func(tmpCpnn *conn, conns net.Conn) { - // wg := new(sync.WaitGroup) - // wg.Add(2) - // _ = poolConnCopy.Invoke(common.newConnGroup(tmpCpnn, conns, wg)) - // //go func() { - // // _, err := common.CopyBuffer(tmpCpnn, conns) - // // if err != nil { - // // conns.Close() - // // tmpCpnn.Close() - // // //logs.Warn("close nps by copy from user", tmpCpnn.connId, err) - // // } - // //}() - // //wg.Add(1) - // _ = poolConnCopy.Invoke(common.newConnGroup(conns, tmpCpnn, wg)) - // //time.Sleep(time.Second) - // //_, err = common.CopyBuffer(conns, tmpCpnn) - // //if err != nil { - // // conns.Close() - // // tmpCpnn.Close() - // // //logs.Warn("close nps by copy from npc ", tmpCpnn.connId, err) - // //} - // wg.Wait() - //}(tmpCpnn, conns) - } - }() - - //go NewLogServer() - time.Sleep(time.Second * 5) - //for i := 0; i < 1; i++ { - // go test_raw(i) - //} - //test_request() - - for { - time.Sleep(time.Second * 5) - } -} - -func server() { - var err error - l, err := net.Listen("tcp", "127.0.0.1:9999") - //l, err := kcp.Listen("127.0.0.1:9999") - if err != nil { - logs.Warn(err) - } - go func() { - conn1, err = l.Accept() - //logs.Info("accept", conn1) - if err != nil { - logs.Warn(err) - } - }() - return -} - -func client() { - var err error - conn2, err = net.Dial("tcp", "127.0.0.1:9999") - //logs.Warn("dial") - //conn2, err = kcp.Dial("127.0.0.1:9999") - if err != nil { - logs.Warn(err) - } -} - -func test_request() { - conn, _ := net.Dial("tcp", "127.0.0.1:7777") - for i := 0; i < 1000; i++ { - conn.Write([]byte(`GET / HTTP/1.1 -Host: 127.0.0.1:7777 -Connection: keep-alive - - -`)) - r, err := http.ReadResponse(bufio.NewReader(conn), nil) - if err != nil { - logs.Warn("close by read response err", err) - break - } - logs.Warn("read response success", r) - b, err := httputil.DumpResponse(r, true) - if err != nil { - logs.Warn("close by dump response err", err) - break - } - fmt.Println(string(b[:20]), err) - //time.Sleep(time.Second) - } - logs.Warn("finish") -} - -func test_raw(k int) { - for i := 0; i < 1000; i++ { - ti := time.Now() - conn, err := net.Dial("tcp", "127.0.0.1:7777") - if err != nil { - logs.Warn("conn dial err", err) - } - tid := time.Now() - conn.Write([]byte(`GET /videojs5/video.js HTTP/1.1 -Host: 127.0.0.1:7777 - - -`)) - tiw := time.Now() - buf := make([]byte, 3572) - n, err := io.ReadFull(conn, buf) - //n, err := conn.Read(buf) - if err != nil { - logs.Warn("close by read response err", err) - break - } - logs.Warn(n, string(buf[:50]), "\n--------------\n", string(buf[n-50:n])) - //time.Sleep(time.Second) - err = conn.Close() - if err != nil { - logs.Warn("close conn err ", err) - } - now := time.Now() - du := now.Sub(ti).Seconds() - dud := now.Sub(tid).Seconds() - duw := now.Sub(tiw).Seconds() - if du > 1 { - logs.Warn("duration long", du, dud, duw, k, i) - } - if n != 3572 { - logs.Warn("n loss", n, string(buf)) - } - } - logs.Warn("finish") -} - -func TestNewConn(t *testing.T) { - buf := common.GetBufPoolCopy() - logs.Warn(len(buf), cap(buf)) - //b := pool.GetBufPoolCopy() - //b[0] = 1 - //b[1] = 2 - //b[2] = 3 - b := []byte{1, 2, 3} - logs.Warn(copy(buf[:3], b), len(buf), cap(buf)) - logs.Warn(len(buf), buf[0]) -} - -func TestDQueue(t *testing.T) { - logs.EnableFuncCallDepth(true) - logs.SetLogFuncCallDepth(3) - d := new(bufDequeue) - d.vals = make([]unsafe.Pointer, 8) - go func() { - time.Sleep(time.Second) - for i := 0; i < 10; i++ { - logs.Warn(i) - logs.Warn(d.popTail()) - } - }() - go func() { - time.Sleep(time.Second) - for i := 0; i < 10; i++ { - data := "test" - go logs.Warn(i, unsafe.Pointer(&data), d.pushHead(unsafe.Pointer(&data))) - } - }() - time.Sleep(time.Second * 3) -} - -func TestChain(t *testing.T) { - go func() { - log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) - }() - logs.EnableFuncCallDepth(true) - logs.SetLogFuncCallDepth(3) - time.Sleep(time.Second * 5) - d := new(bufChain) - d.new(256) - go func() { - time.Sleep(time.Second) - for i := 0; i < 30000; i++ { - unsa, ok := d.popTail() - str := (*string)(unsa) - if ok { - fmt.Println(i, str, *str, ok) - //logs.Warn(i, str, *str, ok) - } else { - fmt.Println("nil", i, ok) - //logs.Warn("nil", i, ok) - } - } - }() - go func() { - time.Sleep(time.Second) - for i := 0; i < 3000; i++ { - go func(i int) { - for n := 0; n < 10; n++ { - data := "test " + strconv.Itoa(i) + strconv.Itoa(n) - fmt.Println(data, unsafe.Pointer(&data)) - //logs.Warn(data, unsafe.Pointer(&data)) - d.pushHead(unsafe.Pointer(&data)) - } - }(i) - } - }() - time.Sleep(time.Second * 100000) -} - -func TestFIFO(t *testing.T) { - go func() { - log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) - }() - logs.EnableFuncCallDepth(true) - logs.SetLogFuncCallDepth(3) - time.Sleep(time.Second * 5) - d := new(ReceiveWindowQueue) - d.New() - go func() { - time.Sleep(time.Second) - for i := 0; i < 1001; i++ { - data, err := d.Pop() - if err == nil { - //fmt.Println(i, string(data.buf), err) - logs.Warn(i, string(data.Buf), err) - common.ListElementPool.Put(data) - } else { - //fmt.Println("err", err) - logs.Warn("err", err) - } - //logs.Warn(d.Len()) - } - logs.Warn("pop finish") - }() - go func() { - time.Sleep(time.Second * 10) - for i := 0; i < 1000; i++ { - by := []byte("test " + strconv.Itoa(i) + " ") // - data, _ := NewListElement(by, uint16(len(by)), true) - //fmt.Println(string((*data).buf), data) - //logs.Warn(string((*data).buf), data) - d.Push(data) - } - }() - time.Sleep(time.Second * 100000) -} - -func TestPriority(t *testing.T) { - go func() { - log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) - }() - logs.EnableFuncCallDepth(true) - logs.SetLogFuncCallDepth(3) - time.Sleep(time.Second * 5) - d := new(PriorityQueue) - d.New() - go func() { - time.Sleep(time.Second) - for i := 0; i < 360050; i++ { - data := d.Pop() - //fmt.Println(i, string(data.buf), err) - logs.Warn(i, string(data.Content), data) - } - logs.Warn("pop finish") - }() - go func() { - time.Sleep(time.Second * 10) - for i := 0; i < 30000; i++ { - go func(i int) { - for n := 0; n < 10; n++ { - data := new(common.MuxPackager) - by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) - _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by) - //fmt.Println(string((*data).buf), data) - logs.Warn(string((*data).Content), data) - d.Push(data) - } - }(i) - go func(i int) { - data := new(common.MuxPackager) - _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil) - //fmt.Println(string((*data).buf), data) - logs.Warn(data) - d.Push(data) - }(i) - go func(i int) { - data := new(common.MuxPackager) - _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil) - //fmt.Println(string((*data).buf), data) - logs.Warn(data) - d.Push(data) - }(i) - } - }() - time.Sleep(time.Second * 100000) -} - -//func TestReceive(t *testing.T) { -// go func() { -// log.Println(http.ListenAndServe("0.0.0.0:8889", nil)) -// }() -// logs.EnableFuncCallDepth(true) -// logs.SetLogFuncCallDepth(3) -// time.Sleep(time.Second * 5) -// mux := new(Mux) -// mux.bw.readBandwidth = float64(1*1024*1024) -// mux.latency = float64(1/1000) -// wind := new(ReceiveWindow) -// wind.New(mux) -// wind. -// go func() { -// time.Sleep(time.Second) -// for i := 0; i < 36000; i++ { -// data := d.Pop() -// //fmt.Println(i, string(data.buf), err) -// logs.Warn(i, string(data.Content), data) -// } -// }() -// go func() { -// time.Sleep(time.Second*10) -// for i := 0; i < 3000; i++ { -// go func(i int) { -// for n := 0; n < 10; n++{ -// data := new(common.MuxPackager) -// by := []byte("test " + strconv.Itoa(i) + strconv.Itoa(n)) -// _ = data.NewPac(common.MUX_NEW_MSG_PART, int32(i), by) -// //fmt.Println(string((*data).buf), data) -// logs.Warn(string((*data).Content), data) -// d.Push(data) -// } -// }(i) -// go func(i int) { -// data := new(common.MuxPackager) -// _ = data.NewPac(common.MUX_NEW_CONN, int32(i), nil) -// //fmt.Println(string((*data).buf), data) -// logs.Warn(data) -// d.Push(data) -// }(i) -// go func(i int) { -// data := new(common.MuxPackager) -// _ = data.NewPac(common.MUX_NEW_CONN_OK, int32(i), nil) -// //fmt.Println(string((*data).buf), data) -// logs.Warn(data) -// d.Push(data) -// }(i) -// } -// }() -// time.Sleep(time.Second * 100000) -//} diff --git a/lib/mux/queue.go b/lib/mux/queue.go deleted file mode 100644 index b7cbb0b..0000000 --- a/lib/mux/queue.go +++ /dev/null @@ -1,592 +0,0 @@ -package mux - -import ( - "ehang.io/nps/lib/common" - "errors" - "io" - "math" - "runtime" - "sync" - "sync/atomic" - "time" - "unsafe" -) - -type PriorityQueue struct { - highestChain *bufChain - middleChain *bufChain - lowestChain *bufChain - starving uint8 - stop bool - cond *sync.Cond -} - -func (Self *PriorityQueue) New() { - Self.highestChain = new(bufChain) - Self.highestChain.new(4) - Self.middleChain = new(bufChain) - Self.middleChain.new(32) - Self.lowestChain = new(bufChain) - Self.lowestChain.new(256) - locker := new(sync.Mutex) - Self.cond = sync.NewCond(locker) -} - -func (Self *PriorityQueue) Push(packager *common.MuxPackager) { - //logs.Warn("push start") - Self.push(packager) - Self.cond.Broadcast() - //logs.Warn("push finish") - return -} - -func (Self *PriorityQueue) push(packager *common.MuxPackager) { - switch packager.Flag { - case common.MUX_PING_FLAG, common.MUX_PING_RETURN: - Self.highestChain.pushHead(unsafe.Pointer(packager)) - // the ping package need highest priority - // prevent ping calculation error - case common.MUX_NEW_CONN, common.MUX_NEW_CONN_OK, common.MUX_NEW_CONN_Fail: - // the new conn package need some priority too - Self.middleChain.pushHead(unsafe.Pointer(packager)) - default: - Self.lowestChain.pushHead(unsafe.Pointer(packager)) - } -} - -const maxStarving uint8 = 8 - -func (Self *PriorityQueue) Pop() (packager *common.MuxPackager) { - var iter bool - for { - packager = Self.TryPop() - if packager != nil { - return - } - if Self.stop { - return - } - if iter { - break - // trying to pop twice - } - iter = true - runtime.Gosched() - } - Self.cond.L.Lock() - defer Self.cond.L.Unlock() - for packager = Self.TryPop(); packager == nil; { - if Self.stop { - return - } - //logs.Warn("queue into wait") - Self.cond.Wait() - // wait for it with no more iter - packager = Self.TryPop() - //logs.Warn("queue wait finish", packager) - } - return -} - -func (Self *PriorityQueue) TryPop() (packager *common.MuxPackager) { - ptr, ok := Self.highestChain.popTail() - if ok { - packager = (*common.MuxPackager)(ptr) - return - } - if Self.starving < maxStarving { - // not pop too much, lowestChain will wait too long - ptr, ok = Self.middleChain.popTail() - if ok { - packager = (*common.MuxPackager)(ptr) - Self.starving++ - return - } - } - ptr, ok = Self.lowestChain.popTail() - if ok { - packager = (*common.MuxPackager)(ptr) - if Self.starving > 0 { - Self.starving = uint8(Self.starving / 2) - } - return - } - if Self.starving > 0 { - ptr, ok = Self.middleChain.popTail() - if ok { - packager = (*common.MuxPackager)(ptr) - Self.starving++ - return - } - } - return -} - -func (Self *PriorityQueue) Stop() { - Self.stop = true - Self.cond.Broadcast() -} - -type ConnQueue struct { - chain *bufChain - starving uint8 - stop bool - cond *sync.Cond -} - -func (Self *ConnQueue) New() { - Self.chain = new(bufChain) - Self.chain.new(32) - locker := new(sync.Mutex) - Self.cond = sync.NewCond(locker) -} - -func (Self *ConnQueue) Push(connection *conn) { - Self.chain.pushHead(unsafe.Pointer(connection)) - Self.cond.Broadcast() - return -} - -func (Self *ConnQueue) Pop() (connection *conn) { - var iter bool - for { - connection = Self.TryPop() - if connection != nil { - return - } - if Self.stop { - return - } - if iter { - break - // trying to pop twice - } - iter = true - runtime.Gosched() - } - Self.cond.L.Lock() - defer Self.cond.L.Unlock() - for connection = Self.TryPop(); connection == nil; { - if Self.stop { - return - } - //logs.Warn("queue into wait") - Self.cond.Wait() - // wait for it with no more iter - connection = Self.TryPop() - //logs.Warn("queue wait finish", packager) - } - return -} - -func (Self *ConnQueue) TryPop() (connection *conn) { - ptr, ok := Self.chain.popTail() - if ok { - connection = (*conn)(ptr) - return - } - return -} - -func (Self *ConnQueue) Stop() { - Self.stop = true - Self.cond.Broadcast() -} - -func NewListElement(buf []byte, l uint16, part bool) (element *common.ListElement, err error) { - if uint16(len(buf)) != l { - err = errors.New("ListElement: buf length not match") - return - } - //if l == 0 { - // logs.Warn("push zero") - //} - element = common.ListElementPool.Get() - element.Buf = buf - element.L = l - element.Part = part - return -} - -type ReceiveWindowQueue struct { - lengthWait uint64 - chain *bufChain - stopOp chan struct{} - readOp chan struct{} - // https://golang.org/pkg/sync/atomic/#pkg-note-BUG - // On non-Linux ARM, the 64-bit functions use instructions unavailable before the ARMv6k core. - // On ARM, x86-32, and 32-bit MIPS, it is the caller's responsibility - // to arrange for 64-bit alignment of 64-bit words accessed atomically. - // The first word in a variable or in an allocated struct, array, or slice can be relied upon to be 64-bit aligned. - timeout time.Time -} - -func NewReceiveWindowQueue() *ReceiveWindowQueue { - queue := ReceiveWindowQueue{ - chain: new(bufChain), - stopOp: make(chan struct{}, 2), - readOp: make(chan struct{}), - } - queue.chain.new(64) - return &queue -} - -func (Self *ReceiveWindowQueue) Push(element *common.ListElement) { - var length, wait uint32 - for { - ptrs := atomic.LoadUint64(&Self.lengthWait) - length, wait = Self.chain.head.unpack(ptrs) - length += uint32(element.L) - if atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(length, 0)) { - break - } - // another goroutine change the length or into wait, make sure - } - //logs.Warn("window push before", Self.Len(), uint32(element.l), len(element.buf)) - Self.chain.pushHead(unsafe.Pointer(element)) - //logs.Warn("window push", Self.Len()) - if wait == 1 { - Self.allowPop() - } - return -} - -func (Self *ReceiveWindowQueue) Pop() (element *common.ListElement, err error) { - var length uint32 -startPop: - ptrs := atomic.LoadUint64(&Self.lengthWait) - length, _ = Self.chain.head.unpack(ptrs) - if length == 0 { - if !atomic.CompareAndSwapUint64(&Self.lengthWait, ptrs, Self.chain.head.pack(0, 1)) { - goto startPop // another goroutine is pushing - } - err = Self.waitPush() - // there is no more data in queue, wait for it - if err != nil { - return - } - goto startPop // wait finish, trying to get the new status - } - // length is not zero, so try to pop - for { - element = Self.TryPop() - if element != nil { - return - } - runtime.Gosched() // another goroutine is still pushing - } -} - -func (Self *ReceiveWindowQueue) TryPop() (element *common.ListElement) { - ptr, ok := Self.chain.popTail() - if ok { - //logs.Warn("window pop before", Self.Len()) - element = (*common.ListElement)(ptr) - atomic.AddUint64(&Self.lengthWait, ^(uint64(element.L)<> dequeueBits) & mask) - tail = uint32(ptrs & mask) - return -} - -func (d *bufDequeue) pack(head, tail uint32) uint64 { - const mask = 1< 0 { - runtime.Gosched() - } - for { - ptrs := atomic.LoadUint64(&d.headTail) - head, tail := d.unpack(ptrs) - if (tail+uint32(len(d.vals)))&(1<= 3 && atomic.LoadUint32(&d.starving) > 0 { - atomic.StoreUint32(&d.starving, 0) - } - break - } - starve++ - if starve >= 3 { - atomic.StoreUint32(&d.starving, 1) - } - } - // The head slot is free, so we own it. - *slot = val - return true -} - -// popTail removes and returns the element at the tail of the queue. -// It returns false if the queue is empty. It may be called by any -// number of consumers. -func (d *bufDequeue) popTail() (unsafe.Pointer, bool) { - ptrs := atomic.LoadUint64(&d.headTail) - head, tail := d.unpack(ptrs) - if tail == head { - // Queue is empty. - return nil, false - } - slot := &d.vals[tail&uint32(len(d.vals)-1)] - var val unsafe.Pointer - for { - val = atomic.LoadPointer(slot) - if val != nil { - // We now own slot. - break - } - // Another goroutine is still pushing data on the tail. - } - - // Tell pushHead that we're done with this slot. Zeroing the - // slot is also important so we don't leave behind references - // that could keep this object live longer than necessary. - // - // We write to val first and then publish that we're done with - atomic.StorePointer(slot, nil) - // At this point pushHead owns the slot. - if tail < math.MaxUint32 { - atomic.AddUint64(&d.headTail, 1) - } else { - atomic.AddUint64(&d.headTail, ^uint64(math.MaxUint32-1)) - } - return val, true -} - -// bufChain is a dynamically-sized version of bufDequeue. -// -// This is implemented as a doubly-linked list queue of poolDequeues -// where each dequeue is double the size of the previous one. Once a -// dequeue fills up, this allocates a new one and only ever pushes to -// the latest dequeue. Pops happen from the other end of the list and -// once a dequeue is exhausted, it gets removed from the list. -type bufChain struct { - // head is the bufDequeue to push to. This is only accessed - // by the producer, so doesn't need to be synchronized. - head *bufChainElt - - // tail is the bufDequeue to popTail from. This is accessed - // by consumers, so reads and writes must be atomic. - tail *bufChainElt - newChain uint32 -} - -type bufChainElt struct { - bufDequeue - - // next and prev link to the adjacent poolChainElts in this - // bufChain. - // - // next is written atomically by the producer and read - // atomically by the consumer. It only transitions from nil to - // non-nil. - // - // prev is written atomically by the consumer and read - // atomically by the producer. It only transitions from - // non-nil to nil. - next, prev *bufChainElt -} - -func storePoolChainElt(pp **bufChainElt, v *bufChainElt) { - atomic.StorePointer((*unsafe.Pointer)(unsafe.Pointer(pp)), unsafe.Pointer(v)) -} - -func loadPoolChainElt(pp **bufChainElt) *bufChainElt { - return (*bufChainElt)(atomic.LoadPointer((*unsafe.Pointer)(unsafe.Pointer(pp)))) -} - -func (c *bufChain) new(initSize int) { - // Initialize the chain. - // initSize must be a power of 2 - d := new(bufChainElt) - d.vals = make([]unsafe.Pointer, initSize) - storePoolChainElt(&c.head, d) - storePoolChainElt(&c.tail, d) -} - -func (c *bufChain) pushHead(val unsafe.Pointer) { -startPush: - for { - if atomic.LoadUint32(&c.newChain) > 0 { - runtime.Gosched() - } else { - break - } - } - - d := loadPoolChainElt(&c.head) - - if d.pushHead(val) { - return - } - - // The current dequeue is full. Allocate a new one of twice - // the size. - if atomic.CompareAndSwapUint32(&c.newChain, 0, 1) { - newSize := len(d.vals) * 2 - if newSize >= dequeueLimit { - // Can't make it any bigger. - newSize = dequeueLimit - } - - d2 := &bufChainElt{prev: d} - d2.vals = make([]unsafe.Pointer, newSize) - d2.pushHead(val) - storePoolChainElt(&c.head, d2) - storePoolChainElt(&d.next, d2) - atomic.StoreUint32(&c.newChain, 0) - return - } - goto startPush -} - -func (c *bufChain) popTail() (unsafe.Pointer, bool) { - d := loadPoolChainElt(&c.tail) - if d == nil { - return nil, false - } - - for { - // It's important that we load the next pointer - // *before* popping the tail. In general, d may be - // transiently empty, but if next is non-nil before - // the TryPop and the TryPop fails, then d is permanently - // empty, which is the only condition under which it's - // safe to drop d from the chain. - d2 := loadPoolChainElt(&d.next) - - if val, ok := d.popTail(); ok { - return val, ok - } - - if d2 == nil { - // This is the only dequeue. It's empty right - // now, but could be pushed to in the future. - return nil, false - } - - // The tail of the chain has been drained, so move on - // to the next dequeue. Try to drop it from the chain - // so the next TryPop doesn't have to look at the empty - // dequeue again. - if atomic.CompareAndSwapPointer((*unsafe.Pointer)(unsafe.Pointer(&c.tail)), unsafe.Pointer(d), unsafe.Pointer(d2)) { - // We won the race. Clear the prev pointer so - // the garbage collector can collect the empty - // dequeue and so popHead doesn't back up - // further than necessary. - storePoolChainElt(&d2.prev, nil) - } - d = d2 - } -} diff --git a/lib/mux/sysGetsock_nowindows.go b/lib/mux/sysGetsock_nowindows.go deleted file mode 100644 index 85d2197..0000000 --- a/lib/mux/sysGetsock_nowindows.go +++ /dev/null @@ -1,46 +0,0 @@ -// +build !windows - -package mux - -import ( - "errors" - "github.com/xtaci/kcp-go" - "net" - "os" - "syscall" -) - -func sysGetSock(fd *os.File) (bufferSize int, err error) { - if fd != nil { - return syscall.GetsockoptInt(int(fd.Fd()), syscall.SOL_SOCKET, syscall.SO_RCVBUF) - } else { - return 5 * 1024 * 1024, nil - } -} - -func getConnFd(c net.Conn) (fd *os.File, err error) { - switch c.(type) { - case *net.TCPConn: - fd, err = c.(*net.TCPConn).File() - if err != nil { - return - } - return - case *net.UDPConn: - fd, err = c.(*net.UDPConn).File() - if err != nil { - return - } - return - case *kcp.UDPSession: - //fd, err = (*net.UDPConn)(unsafe.Pointer(c.(*kcp.UDPSession))).File() - //if err != nil { - // return - //} - // Todo - return - default: - err = errors.New("mux:unknown conn type, only tcp or kcp") - return - } -} diff --git a/lib/mux/sysGetsock_windows.go b/lib/mux/sysGetsock_windows.go deleted file mode 100644 index 579d620..0000000 --- a/lib/mux/sysGetsock_windows.go +++ /dev/null @@ -1,46 +0,0 @@ -// +build windows - -package mux - -import ( - "errors" - "github.com/xtaci/kcp-go" - "net" - "os" -) - -func sysGetSock(fd *os.File) (bufferSize int, err error) { - // https://github.com/golang/sys/blob/master/windows/syscall_windows.go#L1184 - // not support, WTF??? - // Todo - // return syscall.GetsockoptInt((syscall.Handle)(unsafe.Pointer(fd.Fd())), syscall.SOL_SOCKET, syscall.SO_RCVBUF) - bufferSize = 5 * 1024 * 1024 - return -} - -func getConnFd(c net.Conn) (fd *os.File, err error) { - switch c.(type) { - case *net.TCPConn: - //fd, err = c.(*net.TCPConn).File() - //if err != nil { - // return - //} - return - case *net.UDPConn: - //fd, err = c.(*net.UDPConn).File() - //if err != nil { - // return - //} - return - case *kcp.UDPSession: - //fd, err = (*net.UDPConn)(unsafe.Pointer(c.(*kcp.UDPSession))).File() - //if err != nil { - // return - //} - // Todo - return - default: - err = errors.New("mux:unknown conn type, only tcp or kcp") - return - } -} diff --git a/lib/mux/web.go b/lib/mux/web.go deleted file mode 100644 index 36b2017..0000000 --- a/lib/mux/web.go +++ /dev/null @@ -1,154 +0,0 @@ -package mux - -import ( - "fmt" - "github.com/astaxie/beego/logs" - "net/http" - "sort" - "strconv" - "strings" - "sync" - "time" -) - -type connLog struct { - startTime time.Time - isClose bool - logs []string -} - -var logms map[int]*connLog -var logmc map[int]*connLog - -var copyMaps map[int]*connLog -var copyMapc map[int]*connLog -var stashTimeNow time.Time -var mutex sync.Mutex - -func deepCopyMaps() { - copyMaps = make(map[int]*connLog) - for k, v := range logms { - copyMaps[k] = &connLog{ - startTime: v.startTime, - isClose: v.isClose, - logs: v.logs, - } - } -} - -func deepCopyMapc() { - copyMapc = make(map[int]*connLog) - for k, v := range logmc { - copyMapc[k] = &connLog{ - startTime: v.startTime, - isClose: v.isClose, - logs: v.logs, - } - } -} - -func init() { - logms = make(map[int]*connLog) - logmc = make(map[int]*connLog) -} - -type IntSlice []int - -func (s IntSlice) Len() int { return len(s) } - -func (s IntSlice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } - -func (s IntSlice) Less(i, j int) bool { return s[i] < s[j] } - -func NewLogServer() { - http.HandleFunc("/", index) - http.HandleFunc("/detail", detail) - http.HandleFunc("/stash", stash) - fmt.Println(http.ListenAndServe(":8899", nil)) -} - -func stash(w http.ResponseWriter, r *http.Request) { - stashTimeNow = time.Now() - deepCopyMaps() - deepCopyMapc() - w.Write([]byte("ok")) -} - -func getM(label string, id int) (cL *connLog) { - label = strings.TrimSpace(label) - mutex.Lock() - defer mutex.Unlock() - if label == "nps" { - cL = logms[id] - } - if label == "npc" { - cL = logmc[id] - } - return -} - -func setM(label string, id int, cL *connLog) { - label = strings.TrimSpace(label) - mutex.Lock() - defer mutex.Unlock() - if label == "nps" { - logms[id] = cL - } - if label == "npc" { - logmc[id] = cL - } -} - -func index(w http.ResponseWriter, r *http.Request) { - var keys []int - for k := range copyMaps { - keys = append(keys, k) - } - sort.Sort(IntSlice(keys)) - var s string - s += "

nps

" - for _, v := range keys { - connL := copyMaps[v] - s += "" + strconv.Itoa(v) + "----------" - s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------" - s += strconv.FormatBool(connL.isClose) - s += "
" - } - - keys = keys[:0] - s += "

npc

" - for k := range copyMapc { - keys = append(keys, k) - } - sort.Sort(IntSlice(keys)) - - for _, v := range keys { - connL := copyMapc[v] - s += "" + strconv.Itoa(v) + "----------" - s += strconv.Itoa(int(stashTimeNow.Sub(connL.startTime).Milliseconds())) + "ms----------" - s += strconv.FormatBool(connL.isClose) - s += "
" - } - w.Write([]byte(s)) -} - -func detail(w http.ResponseWriter, r *http.Request) { - id := r.FormValue("id") - label := r.FormValue("label") - logs.Warn(label) - i, _ := strconv.Atoi(id) - var v *connLog - if label == "nps" { - v, _ = copyMaps[i] - } - if label == "npc" { - v, _ = copyMapc[i] - } - var s string - if v != nil { - for i, vv := range v.logs { - s += "

" + strconv.Itoa(i+1) + ":" + vv + "

" - } - } - w.Write([]byte(s)) -} diff --git a/lib/mux/web_test.go b/lib/mux/web_test.go deleted file mode 100644 index 91a0430..0000000 --- a/lib/mux/web_test.go +++ /dev/null @@ -1,7 +0,0 @@ -package mux - -import "testing" - -func TestWeb(t *testing.T) { - NewLogServer() -} diff --git a/lib/mux/pconn.go b/lib/pmux/pconn.go similarity index 99% rename from lib/mux/pconn.go rename to lib/pmux/pconn.go index 35af3cc..d4330ef 100644 --- a/lib/mux/pconn.go +++ b/lib/pmux/pconn.go @@ -1,4 +1,4 @@ -package mux +package pmux import ( "net" diff --git a/lib/mux/plistener.go b/lib/pmux/plistener.go similarity index 98% rename from lib/mux/plistener.go rename to lib/pmux/plistener.go index 9bdaabc..deef001 100644 --- a/lib/mux/plistener.go +++ b/lib/pmux/plistener.go @@ -1,4 +1,4 @@ -package mux +package pmux import ( "errors" diff --git a/lib/mux/pmux.go b/lib/pmux/pmux.go similarity index 98% rename from lib/mux/pmux.go rename to lib/pmux/pmux.go index b8de236..0dffb3e 100644 --- a/lib/mux/pmux.go +++ b/lib/pmux/pmux.go @@ -1,6 +1,6 @@ // This module is used for port reuse // Distinguish client, web manager , HTTP and HTTPS according to the difference of protocol -package mux +package pmux import ( "bufio" @@ -139,7 +139,7 @@ func (pMux *PortMux) process(conn net.Conn) { func (pMux *PortMux) Close() error { if pMux.isClose { - return errors.New("the port mux has closed") + return errors.New("the port pmux has closed") } pMux.isClose = true close(pMux.clientConn) diff --git a/lib/mux/pmux_test.go b/lib/pmux/pmux_test.go similarity index 98% rename from lib/mux/pmux_test.go rename to lib/pmux/pmux_test.go index 4c8e44e..6a17ddd 100644 --- a/lib/mux/pmux_test.go +++ b/lib/pmux/pmux_test.go @@ -1,4 +1,4 @@ -package mux +package pmux import ( "testing" diff --git a/lib/version/version.go b/lib/version/version.go index 6a4c8ab..4f59412 100644 --- a/lib/version/version.go +++ b/lib/version/version.go @@ -1,6 +1,6 @@ package version -const VERSION = "0.26.0" +const VERSION = "0.26.1" // Compulsory minimum version, Minimum downward compatibility to this version func GetVersion() string { diff --git a/server/connection/connection.go b/server/connection/connection.go index aac1c34..f614622 100644 --- a/server/connection/connection.go +++ b/server/connection/connection.go @@ -5,12 +5,12 @@ import ( "os" "strconv" - "ehang.io/nps/lib/mux" + "ehang.io/nps/lib/pmux" "github.com/astaxie/beego" "github.com/astaxie/beego/logs" ) -var pMux *mux.PortMux +var pMux *pmux.PortMux var bridgePort string var httpsPort string var httpPort string @@ -28,7 +28,7 @@ func InitConnectionService() { logs.Error(err) os.Exit(0) } - pMux = mux.NewPortMux(port, beego.AppConfig.String("web_host")) + pMux = pmux.NewPortMux(port, beego.AppConfig.String("web_host")) } } diff --git a/server/proxy/http.go b/server/proxy/http.go index 1470c3c..73507ee 100644 --- a/server/proxy/http.go +++ b/server/proxy/http.go @@ -30,11 +30,12 @@ type httpServer struct { httpsServer *http.Server httpsListener net.Listener useCache bool + addOrigin bool cache *cache.Cache cacheLen int } -func NewHttp(bridge *bridge.Bridge, c *file.Tunnel, httpPort, httpsPort int, useCache bool, cacheLen int) *httpServer { +func NewHttp(bridge *bridge.Bridge, c *file.Tunnel, httpPort, httpsPort int, useCache bool, cacheLen int, addOrigin bool) *httpServer { httpServer := &httpServer{ BaseServer: BaseServer{ task: c, @@ -45,6 +46,7 @@ func NewHttp(bridge *bridge.Bridge, c *file.Tunnel, httpPort, httpsPort int, use httpsPort: httpsPort, useCache: useCache, cacheLen: cacheLen, + addOrigin: addOrigin, } if useCache { httpServer.cache = cache.New(cacheLen) @@ -55,7 +57,7 @@ func NewHttp(bridge *bridge.Bridge, c *file.Tunnel, httpPort, httpsPort int, use func (s *httpServer) Start() error { var err error if s.errorContent, err = common.ReadAllFromFile(filepath.Join(common.GetRunPath(), "web", "static", "page", "error.html")); err != nil { - s.errorContent = []byte("easyProxy 404") + s.errorContent = []byte("nps 404") } if s.httpPort > 0 { s.httpServer = s.NewServer(s.httpPort, "http") @@ -116,7 +118,6 @@ func (s *httpServer) handleHttp(c *conn.Conn, r *http.Request) { var ( host *file.Host target net.Conn - lastHost *file.Host err error connClient io.ReadWriteCloser scheme = r.URL.Scheme @@ -133,6 +134,10 @@ func (s *httpServer) handleHttp(c *conn.Conn, r *http.Request) { } c.Close() }() +reset: + if isReset { + host.Client.AddConn() + } if host, err = file.GetDb().GetInfoByHost(r.Host, r); err != nil { logs.Notice("the url %s %s %s can't be parsed!", r.URL.Scheme, r.Host, r.RequestURI) return @@ -141,12 +146,13 @@ func (s *httpServer) handleHttp(c *conn.Conn, r *http.Request) { logs.Warn("client id %d, host id %d, error %s, when https connection", host.Client.Id, host.Id, err.Error()) return } - defer host.Client.AddConn() + if !isReset { + defer host.Client.AddConn() + } if err = s.auth(r, c, host.Client.Cnf.U, host.Client.Cnf.P); err != nil { logs.Warn("auth error", err, r.RemoteAddr) return } -reset: if targetAddr, err = host.Target.GetRandomTarget(); err != nil { logs.Warn(err.Error()) return @@ -157,7 +163,6 @@ reset: return } connClient = conn.GetConn(target, lk.Crypt, lk.Compress, host.Client.Rate, true) - lastHost = host //read from inc-client go func() { @@ -214,7 +219,7 @@ reset: } //change the host and header and set proxy setting - common.ChangeHostAndHeader(r, host.HostChange, host.HeaderChange, c.Conn.RemoteAddr().String()) + common.ChangeHostAndHeader(r, host.HostChange, host.HeaderChange, c.Conn.RemoteAddr().String(), s.addOrigin) logs.Trace("%s request, method %s, host %s, url %s, remote address %s, target %s", r.URL.Scheme, r.Method, r.Host, r.URL.Path, c.RemoteAddr().String(), lk.Host) //write lenConn = conn.NewLenConn(connClient) @@ -235,9 +240,8 @@ reset: if hostTmp, err := file.GetDb().GetInfoByHost(r.Host, r); err != nil { logs.Notice("the url %s %s %s can't be parsed!", r.URL.Scheme, r.Host, r.RequestURI) break - } else if host != lastHost { + } else if host != hostTmp { host = hostTmp - lastHost = host isReset = true connClient.Close() goto reset diff --git a/server/proxy/udp.go b/server/proxy/udp.go index abe2c7f..fa3d0be 100755 --- a/server/proxy/udp.go +++ b/server/proxy/udp.go @@ -55,9 +55,11 @@ func (s *UdpModeServer) process(addr *net.UDPAddr, data []byte) { } defer s.task.Client.AddConn() link := conn.NewLink(common.CONN_UDP, s.task.Target.TargetStr, s.task.Client.Cnf.Crypt, s.task.Client.Cnf.Compress, addr.String(), s.task.Target.LocalProxy) - if target, err := s.bridge.SendLinkInfo(s.task.Client.Id, link, s.task); err != nil { + if clientConn, err := s.bridge.SendLinkInfo(s.task.Client.Id, link, s.task); err != nil { return } else { + target := conn.GetConn(clientConn, s.task.Client.Cnf.Crypt, s.task.Client.Cnf.Compress, nil, true) + defer target.Close() s.task.Flow.Add(int64(len(data)), 0) buf := common.BufPoolUdp.Get().([]byte) defer common.BufPoolUdp.Put(buf) diff --git a/server/server.go b/server/server.go index b1de97e..bae6439 100644 --- a/server/server.go +++ b/server/server.go @@ -7,6 +7,7 @@ import ( "os" "strconv" "strings" + "sync" "time" "ehang.io/nps/bridge" @@ -24,11 +25,11 @@ import ( var ( Bridge *bridge.Bridge - RunList map[int]interface{} + RunList sync.Map //map[int]interface{} ) func init() { - RunList = make(map[int]interface{}) + RunList = sync.Map{} } //init task from db @@ -37,7 +38,8 @@ func InitFromCsv() { if vkey := beego.AppConfig.String("public_vkey"); vkey != "" { c := file.NewClient(vkey, true, true) file.GetDb().NewClient(c) - RunList[c.Id] = nil + RunList.Store(c.Id, nil) + //RunList[c.Id] = nil } //Initialize services in server-side files file.GetDb().JsonDb.Tasks.Range(func(key, value interface{}) bool { @@ -102,7 +104,8 @@ func StartNewServer(bridgePort int, cnf *file.Tunnel, bridgeType string) { if err := svr.Start(); err != nil { logs.Error(err) } - RunList[cnf.Id] = svr + RunList.Store(cnf.Id, svr) + //RunList[cnf.Id] = svr } else { logs.Error("Incorrect startup mode %s", cnf.Mode) } @@ -147,14 +150,16 @@ func NewMode(Bridge *bridge.Bridge, c *file.Tunnel) proxy.Service { httpsPort, _ := beego.AppConfig.Int("https_proxy_port") useCache, _ := beego.AppConfig.Bool("http_cache") cacheLen, _ := beego.AppConfig.Int("http_cache_length") - service = proxy.NewHttp(Bridge, c, httpPort, httpsPort, useCache, cacheLen) + addOrigin, _ := beego.AppConfig.Bool("http_add_origin_header") + service = proxy.NewHttp(Bridge, c, httpPort, httpsPort, useCache, cacheLen, addOrigin) } return service } //stop server func StopServer(id int) error { - if v, ok := RunList[id]; ok { + //if v, ok := RunList[id]; ok { + if v, ok := RunList.Load(id); ok { if svr, ok := v.(proxy.Service); ok { if err := svr.Close(); err != nil { return err @@ -169,7 +174,8 @@ func StopServer(id int) error { t.Status = false file.GetDb().UpdateTask(t) } - delete(RunList, id) + //delete(RunList, id) + RunList.Delete(id) return nil } return errors.New("task is not running") @@ -179,7 +185,8 @@ func StopServer(id int) error { func AddTask(t *file.Tunnel) error { if t.Mode == "secret" || t.Mode == "p2p" { logs.Info("secret task %s start ", t.Remark) - RunList[t.Id] = nil + //RunList[t.Id] = nil + RunList.Store(t.Id, nil) return nil } if b := tool.TestServerPort(t.Port, t.Mode); !b && t.Mode != "httpHostServer" { @@ -191,11 +198,13 @@ func AddTask(t *file.Tunnel) error { } if svr := NewMode(Bridge, t); svr != nil { logs.Info("tunnel task %s start mode:%s port %d", t.Remark, t.Mode, t.Port) - RunList[t.Id] = svr + //RunList[t.Id] = svr + RunList.Store(t.Id, svr) go func() { if err := svr.Start(); err != nil { logs.Error("clientId %d taskId %d start error %s", t.Client.Id, t.Id, err) - delete(RunList, t.Id) + //delete(RunList, t.Id) + RunList.Delete(t.Id) return } }() @@ -219,7 +228,8 @@ func StartTask(id int) error { //delete task func DelTask(id int) error { - if _, ok := RunList[id]; ok { + //if _, ok := RunList[id]; ok { + if _, ok := RunList.Load(id); ok { if err := StopServer(id); err != nil { return err } @@ -249,7 +259,8 @@ func GetTunnel(start, length int, typeVal string, clientId int, search string) ( } if start--; start < 0 { if length--; length >= 0 { - if _, ok := RunList[v.Id]; ok { + //if _, ok := RunList[v.Id]; ok { + if _, ok := RunList.Load(v.Id); ok { v.RunStatus = true } else { v.RunStatus = false diff --git a/web/controllers/base.go b/web/controllers/base.go index 426692f..cf4da6a 100755 --- a/web/controllers/base.go +++ b/web/controllers/base.go @@ -33,10 +33,13 @@ func (s *BaseController) Prepare() { timestamp := s.GetIntNoErr("timestamp") configKey := beego.AppConfig.String("auth_key") timeNowUnix := time.Now().Unix() - if !((math.Abs(float64(timeNowUnix-int64(timestamp))) <= 20) && (crypt.Md5(configKey+strconv.Itoa(timestamp)) == md5Key)) { + if !(md5Key!="" && (math.Abs(float64(timeNowUnix-int64(timestamp))) <= 20) && (crypt.Md5(configKey+strconv.Itoa(timestamp)) == md5Key)) { if s.GetSession("auth") != true { s.Redirect(beego.AppConfig.String("web_base_url")+"/login/index", 302) } + }else { + s.SetSession("isAdmin",true) + s.Data["isAdmin"] = true } if s.GetSession("isAdmin") != nil && !s.GetSession("isAdmin").(bool) { s.Ctx.Input.SetData("client_id", s.GetSession("clientId").(int))