diff --git a/cmd/geth/admin.go b/cmd/geth/admin.go index defbb43ec..bd09291bf 100644 --- a/cmd/geth/admin.go +++ b/cmd/geth/admin.go @@ -28,6 +28,7 @@ func (js *jsre) adminBindings() { admin := t.Object() admin.Set("suggestPeer", js.suggestPeer) admin.Set("startRPC", js.startRPC) + admin.Set("stopRPC", js.stopRPC) admin.Set("nodeInfo", js.nodeInfo) admin.Set("peers", js.peers) admin.Set("newAccount", js.newAccount) @@ -226,6 +227,13 @@ func (js *jsre) startRPC(call otto.FunctionCall) otto.Value { return otto.TrueValue() } +func (js *jsre) stopRPC(call otto.FunctionCall) otto.Value { + if rpc.Stop() == nil { + return otto.TrueValue() + } + return otto.FalseValue() +} + func (js *jsre) suggestPeer(call otto.FunctionCall) otto.Value { nodeURL, err := call.Argument(0).ToString() if err != nil { diff --git a/rpc/http.go b/rpc/http.go index 790442a28..f9c646908 100644 --- a/rpc/http.go +++ b/rpc/http.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "io/ioutil" - "net" "net/http" "github.com/ethereum/go-ethereum/logger" @@ -15,6 +14,7 @@ import ( ) var rpclogger = logger.NewLogger("RPC") +var rpclistener *stoppableTCPListener const ( jsonrpcver = "2.0" @@ -22,11 +22,19 @@ const ( ) func Start(pipe *xeth.XEth, config RpcConfig) error { - l, err := net.Listen("tcp", fmt.Sprintf("%s:%d", config.ListenAddress, config.ListenPort)) + if rpclistener != nil { + if fmt.Sprintf("%s:%d", config.ListenAddress, config.ListenPort) != rpclistener.Addr().String() { + return fmt.Errorf("RPC service already running on %s ", rpclistener.Addr().String()) + } + return nil // RPC service already running on given host/port + } + + l, err := newStoppableTCPListener(fmt.Sprintf("%s:%d", config.ListenAddress, config.ListenPort)) if err != nil { rpclogger.Errorf("Can't listen on %s:%d: %v", config.ListenAddress, config.ListenPort, err) return err } + rpclistener = l var handler http.Handler if len(config.CorsDomain) > 0 { @@ -35,9 +43,9 @@ func Start(pipe *xeth.XEth, config RpcConfig) error { opts.AllowedOrigins = []string{config.CorsDomain} c := cors.New(opts) - handler = c.Handler(JSONRPC(pipe)) + handler = newStoppableHandler(c.Handler(JSONRPC(pipe)), l.stop) } else { - handler = JSONRPC(pipe) + handler = newStoppableHandler(JSONRPC(pipe), l.stop) } go http.Serve(l, handler) @@ -45,6 +53,15 @@ func Start(pipe *xeth.XEth, config RpcConfig) error { return nil } +func Stop() error { + if rpclistener != nil { + rpclistener.Stop() + rpclistener = nil + } + + return nil +} + // JSONRPC returns a handler that implements the Ethereum JSON-RPC API. func JSONRPC(pipe *xeth.XEth) http.Handler { api := NewEthereumApi(pipe) diff --git a/rpc/types.go b/rpc/types.go index bc9a46ed5..1784759a4 100644 --- a/rpc/types.go +++ b/rpc/types.go @@ -23,6 +23,13 @@ import ( "math/big" "strings" + "errors" + "net" + "net/http" + "time" + + "io" + "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/types" ) @@ -257,3 +264,95 @@ type RpcErrorObject struct { Message string `json:"message"` // Data interface{} `json:"data"` } + +type listenerHasStoppedError struct { + msg string +} + +func (self listenerHasStoppedError) Error() string { + return self.msg +} + +var listenerStoppedError = listenerHasStoppedError{"Listener stopped"} + +// When https://github.com/golang/go/issues/4674 is fixed this could be replaced +type stoppableTCPListener struct { + *net.TCPListener + stop chan struct{} // closed when the listener must stop +} + +// Wraps the default handler and checks if the RPC service was stopped. In that case it returns an +// error indicating that the service was stopped. This will only happen for connections which are +// kept open (HTTP keep-alive) when the RPC service was shutdown. +func newStoppableHandler(h http.Handler, stop chan struct{}) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + select { + case <-stop: + w.Header().Set("Content-Type", "application/json") + jsonerr := &RpcErrorObject{-32603, "RPC service stopped"} + send(w, &RpcErrorResponse{Jsonrpc: jsonrpcver, Id: nil, Error: jsonerr}) + default: + h.ServeHTTP(w, r) + } + }) +} + +// Stop the listener and all accepted and still active connections. +func (self *stoppableTCPListener) Stop() { + close(self.stop) +} + +func newStoppableTCPListener(addr string) (*stoppableTCPListener, error) { + wl, err := net.Listen("tcp", addr) + if err != nil { + return nil, err + } + + if tcpl, ok := wl.(*net.TCPListener); ok { + stop := make(chan struct{}) + l := &stoppableTCPListener{tcpl, stop} + return l, nil + } + + return nil, errors.New("Unable to create TCP listener for RPC service") +} + +func (self *stoppableTCPListener) Accept() (net.Conn, error) { + for { + self.SetDeadline(time.Now().Add(time.Duration(1 * time.Second))) + c, err := self.TCPListener.AcceptTCP() + + select { + case <-self.stop: + if c != nil { // accept timeout + c.Close() + } + self.TCPListener.Close() + return nil, listenerStoppedError + default: + } + + if err != nil { + if netErr, ok := err.(net.Error); ok && netErr.Timeout() && netErr.Temporary() { + continue // regular timeout + } + } + + return &closableConnection{c, self.stop}, err + } +} + +type closableConnection struct { + *net.TCPConn + closed chan struct{} +} + +func (self *closableConnection) Read(b []byte) (n int, err error) { + select { + case <-self.closed: + self.TCPConn.Close() + return 0, io.EOF + default: + return self.TCPConn.Read(b) + } +}